[GitHub] nifi issue #2794: NIFI-4579: Fix ValidateRecord type coercing
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/2794 +1 LGTM, verified the unit test illustrates the behavior and the patch fixes it. Thanks for the fix! Merging to master ---
[GitHub] nifi pull request #3217: NIFI-3988: Add fragment attributes to SplitRecord
GitHub user mattyb149 opened a pull request: https://github.com/apache/nifi/pull/3217 NIFI-3988: Add fragment attributes to SplitRecord Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mattyb149/nifi NIFI-3988 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3217.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3217 commit 1b95cd85fe623e6f981340dbfdcdc3c1034dbd5d Author: Matthew Burgess Date: 2018-12-13T19:02:14Z NIFI-3988: Add fragment attributes to SplitRecord ---
[GitHub] nifi issue #3216: NIFI-5891 fix handling of null logical types in Hive3Strea...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3216 +1 LGTM, verified the unit tests illustrate the problem and the patch fixes it. There are 2 CheckStyle violations but I will fix them on merge. Thanks for the fix! Merging to master ---
[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2521#discussion_r239241291 --- Diff: nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java --- @@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final ProcessSession session) thro String sessionId = livyController.get("sessionId"); String livyUrl = livyController.get("livyUrl"); -String code = context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue(); -if (StringUtils.isEmpty(code)) { -try (InputStream inputStream = session.read(flowFile)) { -// If no code was provided, assume it is in the content of the incoming flow file -code = IOUtils.toString(inputStream, charset); -} catch (IOException ioe) { -log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); -flowFile = session.penalize(flowFile); -session.transfer(flowFile, REL_FAILURE); -return; -} -} -code = StringEscapeUtils.escapeJavaScript(code); -String payload = "{\"code\":\"" + code + "\"}"; + try { -final JSONObject result = submitAndHandleJob(livyUrl, livySessionService, sessionId, payload, statusCheckInterval); -log.debug("ExecuteSparkInteractive Result of Job Submit: " + result); -if (result == null) { -session.transfer(flowFile, REL_FAILURE); -} else { + +if (isBatchJob) { + +String jsonResponse = null; + +if (StringUtils.isEmpty(jsonResponse)) { +try (InputStream inputStream = session.read(flowFile)) { +// If no code was provided, assume it is in the content of the incoming flow file +jsonResponse = IOUtils.toString(inputStream, charset); +} catch (IOException ioe) { +log.error("Error reading input flowfile, penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe); +flowFile = session.penalize(flowFile); +session.transfer(flowFile, REL_FAILURE); +return; +} +} + +log.debug(" > jsonResponse: " + jsonResponse); + try { -final JSONObject output = result.getJSONObject("data"); -flowFile = session.write(flowFile, out -> out.write(output.toString().getBytes())); -flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -session.transfer(flowFile, REL_SUCCESS); -} catch (JSONException je) { -// The result doesn't contain the data, just send the output object as the flow file content to failure (after penalizing) -log.error("Spark Session returned an error, sending the output JSON object as the flow file content to failure (after penalizing)"); -flowFile = session.write(flowFile, out -> out.write(result.toString().getBytes())); + +final JSONObject jsonResponseObj = new JSONObject(jsonResponse); + +Map headers = new HashMap<>(); +headers.put("Content-Type", LivySessionService.APPLICATION_JSON); +headers.put("X-Requested-By", LivySessionService.USER); +headers.put("Accept", "application/json"); + +JSONObject jobInfo = readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, headers); + +flowFile = session.write(flowFile, out -> out.write(jobInfo.toString().getBytes())); flowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON); -flowFile = session.penalize(flowFile); + +Thread.sleep(statusCheckInterval); + +String state = jobInfo.getString("state"); +log.debug(" > jsonResponseObj State: " + state); + +
[GitHub] nifi pull request #3191: NIFI-5855: Remove unnecessary ORDER BY clause in Ge...
GitHub user mattyb149 opened a pull request: https://github.com/apache/nifi/pull/3191 NIFI-5855: Remove unnecessary ORDER BY clause in GenerateTableFetch when Partition Size is zero Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mattyb149/nifi NIFI-5855 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3191.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3191 commit 78a3c9badf4724691022081413bbb0a6de2f960e Author: Matthew Burgess Date: 2018-11-29T22:46:24Z NIFI-5855: Remove unnecessary ORDER BY clause in GenerateTableFetch when Partition Size is zero ---
[GitHub] nifi pull request #3188: NIFI-5829 Create Lookup Controller Services for Rec...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3188#discussion_r237218520 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/ReaderLookup.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Tags({"lookup", "parse", "record", "row", "reader"}) +@SeeAlso({RecordSetWriterLookup.class}) +@CapabilityDescription("Provides a RecordReaderFactory that can be used to dynamically select another RecordReaderFactory. This service " + +"requires an attribute named 'recordreader.name' to be passed in when asking for a record record, and will throw an exception " + +"if the attribute is missing. The value of 'recordreader.name' will be used to select the RecordReaderFactory that has been " + +"registered with that name. This will allow multiple RecordReaderFactory's to be defined and registered, and then selected " + +"dynamically at runtime by tagging flow files with the appropriate 'recordreader.name' attribute.") +@DynamicProperty(name = "The ", value = "RecordWriterFactory property value", expressionLanguageScope = ExpressionLanguageScope.NONE, +description = "") +public class ReaderLookup extends AbstractControllerService implements RecordReaderFactory { --- End diff -- Now that controller services can have access to attributes, it would be nice to add support in the documentation for `ReadsAttributes` annotations. You don't need to do that here, just leaving it as a reminder for a future improvement Jira :) ---
[GitHub] nifi pull request #3188: NIFI-5829 Create Lookup Controller Services for Rec...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3188#discussion_r237218092 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/ReaderLookup.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Tags({"lookup", "parse", "record", "row", "reader"}) +@SeeAlso({RecordSetWriterLookup.class}) +@CapabilityDescription("Provides a RecordReaderFactory that can be used to dynamically select another RecordReaderFactory. This service " + +"requires an attribute named 'recordreader.name' to be passed in when asking for a record record, and will throw an exception " + +"if the attribute is missing. The value of 'recordreader.name' will be used to select the RecordReaderFactory that has been " + +"registered with that name. This will allow multiple RecordReaderFactory's to be defined and registered, and then selected " + +"dynamically at runtime by tagging flow files with the appropriate 'recordreader.name' attribute.") +@DynamicProperty(name = "The ", value = "RecordWriterFactory property value", expressionLanguageScope = ExpressionLanguageScope.NONE, --- End diff -- I realize this was copy-pasted from DBCPConnectionPoolLookup, but I think it makes the documentation choppy, IMO it would be better to have the name be something like `` and the value something like `A RecordReader controller service`. Also this is ReaderLookup but the current value references RecordWriterFactory ---
[GitHub] nifi issue #3186: NIFI-5843 added subjects to the error message to make it c...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3186 +1 LGTM, thanks for the improvement! Merging to master ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237138338 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -132,17 +134,25 @@ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); +// Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is transferred to this relationship if the operation completed successfully.") .build(); + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.name("original") +.description("All input FlowFiles that are part of a successful query execution go here.") +.build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") -.description("A FlowFile is transferred to this relationship if the operation failed.") +.description("CQL query execution failed.") --- End diff -- I'd prefer `CQL operation` rather than `query`, as PutCassandra is more of a statement than a query, and this base processor could be used to do other things. Technically a processor wouldn't have to use CQL although I'm not sure of other ways to interact with Cassandra, and I think the user would be comfortable with the term CQL to mean Cassandra, even if it didn't use CQL. ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237138928 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -132,17 +134,25 @@ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); +// Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is transferred to this relationship if the operation completed successfully.") .build(); + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.name("original") +.description("All input FlowFiles that are part of a successful query execution go here.") --- End diff -- Maybe `CQL operation` here instead of `query execution`, see comments below ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237141917 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet = queryFuture.getUninterruptibly(); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAv
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237152109 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -269,11 +313,17 @@ public void process(final OutputStream out) throws IOException { // cap the error limit at 10, format the messages, and don't include the stack trace (it is displayed by the // logger message above). getLogger().error(nhae.getCustomMessage(10, true, false)); +if (fileToProcess == null) { +fileToProcess = session.create(); +} fileToProcess = session.penalize(fileToProcess); session.transfer(fileToProcess, REL_RETRY); - } catch (final QueryExecutionException qee) { +//session.rollback(); --- End diff -- There are a few spots of commented out code, please remove if not necessary, thanks! ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237138678 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -132,17 +134,25 @@ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); +// Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile is transferred to this relationship if the operation completed successfully.") .build(); + +static final Relationship REL_ORIGINAL = new Relationship.Builder() +.name("original") +.description("All input FlowFiles that are part of a successful query execution go here.") +.build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") -.description("A FlowFile is transferred to this relationship if the operation failed.") +.description("CQL query execution failed.") .build(); + public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") -.description("A FlowFile is transferred to this relationship if the operation cannot be completed but attempting " -+ "it again may succeed.") +.description("A FlowFile is transferred to this relationship if the query cannot be completed but attempting " --- End diff -- I tend to like `operation` here (see above), and I like that you replaced `it` with `operation` for clarity. ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r237147586 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet = queryFuture.getUninterruptibly(); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAv
[GitHub] nifi pull request #3184: NIFI-5845: Add support for OTHER and SQLXML JDBC ty...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3184#discussion_r237101820 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java --- @@ -402,6 +409,9 @@ public static long convertToCsvStream(final ResultSet rs, final OutputStream out rowValues.add(""); } break; +case SQLXML: + rowValues.add(StringEscapeUtils.escapeCsv(((java.sql.SQLXML) value).getString())); --- End diff -- Yep missed that (nulls are handled earlier in the Avro code, but individually in the CSV code), will add both, good catch! I had tested with nulls but only for Avro output. ---
[GitHub] nifi pull request #3184: NIFI-5845: Add support for OTHER and SQLXML JDBC ty...
GitHub user mattyb149 opened a pull request: https://github.com/apache/nifi/pull/3184 NIFI-5845: Add support for OTHER and SQLXML JDBC types to SQL/Hive processors Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mattyb149/nifi NIFI-5845 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3184.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3184 commit a8009f9a0010b99703713a38263006a7a9e5f5d5 Author: Matthew Burgess Date: 2018-11-27T23:35:29Z NIFI-5845: Add support for OTHER and SQLXML JDBC types to SQL/Hive processors ---
[GitHub] nifi pull request #3179: NIFI-5834: Restore default PutHiveQL error handling...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3179#discussion_r236410289 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java --- @@ -148,7 +148,23 @@ public void constructProcess() { if (e instanceof SQLNonTransientException) { return ErrorTypes.InvalidInput; } else if (e instanceof SQLException) { -return ErrorTypes.TemporalFailure; +// Use the SQLException's vendor code for guidance -- see Hive's ErrorMsg class for details on error codes +int errorCode = ((SQLException) e).getErrorCode(); --- End diff -- I'll add the debug logging for now, we can add errorCode possibly to an attribute (as was just done for some SQL processor(s)) under a separate Jira? ---
[GitHub] nifi pull request #3179: NIFI-5834: Restore default PutHiveQL error handling...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3179#discussion_r236410267 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java --- @@ -148,7 +148,23 @@ public void constructProcess() { if (e instanceof SQLNonTransientException) { return ErrorTypes.InvalidInput; } else if (e instanceof SQLException) { -return ErrorTypes.TemporalFailure; +// Use the SQLException's vendor code for guidance -- see Hive's ErrorMsg class for details on error codes +int errorCode = ((SQLException) e).getErrorCode(); +if (errorCode >= 1 && errorCode < 2) { +return ErrorTypes.InvalidInput; +} else if (errorCode >= 2 && errorCode < 3) { +return ErrorTypes.TemporalFailure; --- End diff -- Good point, will change. ---
[GitHub] nifi issue #3167: NIFI-5812: Marked Database processors as 'PrimaryNodeOnly'
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3167 Removing the PrimaryNodeOnly annotation from GTF causes a Checkstyle violation due to the unused import. Can you remove the import statement as well? Wouldn't hurt to rebase (not merge) as well. Please and thanks! ---
[GitHub] nifi pull request #3179: NIFI-5834: Restore default PutHiveQL error handling...
GitHub user mattyb149 opened a pull request: https://github.com/apache/nifi/pull/3179 NIFI-5834: Restore default PutHiveQL error handling behavior Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mattyb149/nifi NIFI-5834 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3179.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3179 commit 3e1b98b74476bc534f7039580cfde25bd733e773 Author: Matthew Burgess Date: 2018-11-20T22:58:59Z NIFI-5834: Restore default PutHiveQL error handling behavior ---
[GitHub] nifi issue #3177: NIFI-5828: Documents behavior of ExecuteSQL attrs when Max...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3177 @colindean mind closing this PR? Doesn't look like the comment in the commit caused it to auto-close (maybe the period after the PR number?) Please and thanks! Also I updated the Fix Version to be 1.9.0 as 1.8.0 was already released ---
[GitHub] nifi issue #3128: NIFI-5788: Introduce batch size limit in PutDatabaseRecord...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3128 +1 LGTM, tested with various batch sizes and ran unit tests. Thanks for this improvment! Merged to master ---
[GitHub] nifi pull request #3075: NIFI-5604: Added property to allow empty FlowFile w...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3075#discussion_r233598457 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java --- @@ -450,7 +450,33 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory // If there are no SQL statements to be generated, still output an empty flow file if specified by the user if (numberOfFetches == 0 && outputEmptyFlowFileOnZeroResults) { -session.transfer((fileToProcess == null) ? session.create() : session.create(fileToProcess), REL_SUCCESS); +FlowFile emptyFlowFile = (fileToProcess == null) ? session.create() : session.create(fileToProcess); +Map attributesToAdd = new HashMap<>(); + +attributesToAdd.put("generatetablefetch.tableName", tableName); +if (columnNames != null) { + attributesToAdd.put("generatetablefetch.columnNames", columnNames); +} +whereClause = maxValueClauses.isEmpty() ? "1=1" : StringUtils.join(maxValueClauses, " AND "); +if (StringUtils.isNotBlank(whereClause)) { --- End diff -- Yeah that's probably left over from before we added the "1=1", I'll take a look and clean it up ---
[GitHub] nifi pull request #3170: NIFI-5652: Fixed LogMessage when logging level is d...
GitHub user mattyb149 opened a pull request: https://github.com/apache/nifi/pull/3170 NIFI-5652: Fixed LogMessage when logging level is disabled I couldn't write a unit test as I can't change the log level in between unit tests. I reproduced the issue and verified it is no longer present with a live NiFi instance running with this patch. ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mattyb149/nifi NIFI-5652 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3170.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3170 commit 6c08aea9a7394237d5e575086fe965001e3700d7 Author: Matthew Burgess Date: 2018-11-14T15:50:37Z NIFI-5652: Fixed LogMessage when logging level is disabled ---
[GitHub] nifi issue #1953: NIFI-4130 Add lookup controller service in TransformXML to...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/1953 +1 LGTM, thanks for the review @bdesert and the improvement @pvillard31 ! Merging to master ---
[GitHub] nifi pull request #3075: NIFI-5604: Added property to allow empty FlowFile w...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3075#discussion_r233142376 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java --- @@ -434,48 +448,53 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory numberOfFetches = (partitionSize == 0) ? 1 : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); } -// Generate SQL statements to read "pages" of data -Long limit = partitionSize == 0 ? null : (long) partitionSize; -final String fragmentIdentifier = UUID.randomUUID().toString(); -for (long i = 0; i < numberOfFetches; i++) { -// Add a right bounding for the partitioning column if necessary (only on last partition, meaning we don't need the limit) -if ((i == numberOfFetches - 1) && useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != null)) { -maxValueClauses.add(columnForPartitioning + " <= " + maxValueForPartitioning); -limit = null; -} +// If there are no SQL statements to be generated, still output an empty flow file if specified by the user +if (numberOfFetches == 0 && outputEmptyFlowFileOnZeroResults) { +session.transfer((fileToProcess == null) ? session.create() : session.create(fileToProcess), REL_SUCCESS); --- End diff -- Good call, will make the change. ---
[GitHub] nifi issue #3130: NIFI-5791: Add Apache Daffodil (incubating) bundle
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3130 This is interesting stuff, I need to read more up on DFDL, might make a good schema definition language as an alternative to Avro schemas. Also at first glance it seems the Daffodil infoset is analogous to a NiFi Record for data, and/or a NiFi RecordSchema for its schema. Having an infoset as the underlying data model seems to overlap a bit with how the NiFi Record stuff works, for example we could have a PCAPReader and any writer such as JSONWriter, then the ConvertRecord processor can convert PCAP to JSON. I think someone was working on a PCAP reader, not sure how they handled the global stuff. If Avro schemas are not sufficient to capture this kind of model, then we should definitely look at DFDL as an alternative (we've been considering XSD as a more expressive alternative). Quick note about JOLT, the "normal" use case is for JSON-to-JSON conversion, but the underlying library does the transformation on a POJO, not JSON. So the usual entry point for JOLT is to give it a JSON string, it converts to a POJO, runs the transformation, and takes the resulting POJO and writes it out as JSON. In NiFi I wrote JoltTransformRecord, which instead of explicit JSON uses the NiFi Record API, so it gets each record (in whichever format) as a NiFi Record, translates it to a POJO, then runs the JOLT transforms, then translates back into a NiFi Record, which is written out in whatever format is specified by the RecordWriter. ---
[GitHub] nifi issue #3167: NIFI-5812: Marked Database processors as 'PrimaryNodeOnly'
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3167 I did something like that for the Hive 1 version of PutHiveStreaming, the underlying library wasn't thread-safe if you were working on the same table, so I put in a "table lock" where multiple threads couldn't act on the same table. With QDT it doesn't take incoming flow files, so the table name is effectively hard-coded. By adding PrimaryNodeOnly we can guarantee that QDT only has one instance (it is already TriggerSerially so can't have multiple threads). For GTF if you use ListDatabaseTables on the primary node only (not sure if we should force that with this annotation or not) then each flow file should have a different table, and using a load-balanced connection (or RPG -> Input Port) then each instance of GTF should be working on a different table. The onus is on the user to set Max Concurrent Tasks for GTF to 1 to prevent multi-threaded execution. Perhaps for GTF, instead of forcing PrimaryNodeOnly, we can make it clear in the doc that if there are no incoming connections, it should probably be run on the primary node only. Alternatively, maybe during annotation processing we can enforce that processors annotated with PrimaryNodeOnly automatically have an InputRequirement of INPUT_FORBIDDEN? Otherwise flow files can get stalled if they are in the queue on a node that is not the primary. ---
[GitHub] nifi pull request #3167: NIFI-5812: Marked Database processors as 'PrimaryNo...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3167#discussion_r232697514 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java --- @@ -114,6 +114,7 @@ + "max value for max value columns. Properties should be added in the format `initial.maxvalue.`. This value is only used the first time " + "the table is accessed (when a Maximum Value Column is specified). In the case of incoming connections, the value is only used the first time for each table " + "specified in the flow files.") +@PrimaryNodeOnly --- End diff -- Since GenerateTableFetch can accept incoming flow files, I don't think we should restrict this to run only on the primary node. For example, you could do a ListDatabaseTables, then distribute the flow files among the cluster, where each node's downstream GTF could do the fetch in parallel. In fact that's the main reason we have GTF rather than just QueryDatabaseTable. ---
[GitHub] nifi pull request #3133: NIFI-5790: Exposes 6 commons-dbcp options in DBCPCo...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3133#discussion_r232011596 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java --- @@ -164,6 +166,77 @@ public ValidationResult validate(final String subject, final String input, final .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); +public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder() +.displayName("Minimum Idle Connections") +.name("dbcp-mim-idle-conns") +.description("The minimum number of connections that can remain idle in the pool, without extra ones being " + +"created, or zero to create none.") + .defaultValue(String.valueOf(GenericObjectPoolConfig.DEFAULT_MIN_IDLE)) --- End diff -- I didn't realize the constants were only available from a class in an "impl" package in DBCP. In that case I take back what I said :( Can we create our own constants here with a comment that we got the values from DBCP itself? ---
[GitHub] nifi pull request #3133: NIFI-5790: Exposes 6 commons-dbcp options in DBCPCo...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3133#discussion_r231582876 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java --- @@ -164,6 +164,71 @@ public ValidationResult validate(final String subject, final String input, final .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); +public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder() +.displayName("Minimum Idle Connections") +.name("dbcp-mim-idle-conns") +.description("The minimum number of connections that can remain idle in the pool, without extra ones being " + +"created, or zero to create none.") +.defaultValue("0") +.required(true) +.addValidator(StandardValidators.INTEGER_VALIDATOR) +.build(); + +public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder() +.displayName("Max Idle Connections") +.name("dbcp-max-idle-conns") +.description("The maximum number of connections that can remain idle in the pool, without extra ones being " + +"released, or negative for no limit.") +.defaultValue("8") +.required(true) +.addValidator(StandardValidators.INTEGER_VALIDATOR) +.build(); + +public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder() +.displayName("Max Connection Lifetime") +.name("dbcp-max-conn-lifetime") +.description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " + +"connection will fail the next activation, passivation or validation test. A value of zero or less " + +"means the connection has an infinite lifetime.") +.defaultValue("-1") +.required(true) +.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR) +.build(); + +public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder() +.displayName("Time Between Eviction Runs") +.name("dbcp-time-between-eviction-runs") +.description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " + +"non-positive, no idle connection evictor thread will be run.") +.defaultValue("-1") +.required(true) +.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR) +.build(); + +public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new PropertyDescriptor.Builder() +.displayName("Minimum Evictable Idle Time") +.name("dbcp-min-evictable-idle-time") +.description("The minimum amount of time a connection may sit idle in the pool before it is eligible for eviction.") +.defaultValue("1800 secs") +.required(true) +.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR) --- End diff -- The validator supports expression language, so I think these properties should support expression language at the Variable Registry level ---
[GitHub] nifi pull request #3133: NIFI-5790: Exposes 6 commons-dbcp options in DBCPCo...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3133#discussion_r231582349 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java --- @@ -164,6 +164,71 @@ public ValidationResult validate(final String subject, final String input, final .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); +public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder() +.displayName("Minimum Idle Connections") +.name("dbcp-mim-idle-conns") +.description("The minimum number of connections that can remain idle in the pool, without extra ones being " + +"created, or zero to create none.") +.defaultValue("0") +.required(true) +.addValidator(StandardValidators.INTEGER_VALIDATOR) +.build(); + +public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder() +.displayName("Max Idle Connections") +.name("dbcp-max-idle-conns") +.description("The maximum number of connections that can remain idle in the pool, without extra ones being " + +"released, or negative for no limit.") +.defaultValue("8") +.required(true) +.addValidator(StandardValidators.INTEGER_VALIDATOR) +.build(); + +public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder() +.displayName("Max Connection Lifetime") +.name("dbcp-max-conn-lifetime") +.description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " + +"connection will fail the next activation, passivation or validation test. A value of zero or less " + +"means the connection has an infinite lifetime.") +.defaultValue("-1") +.required(true) +.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR) +.build(); + +public static final PropertyDescriptor EVICTION_RUN_PERIOD = new PropertyDescriptor.Builder() +.displayName("Time Between Eviction Runs") +.name("dbcp-time-between-eviction-runs") +.description("The number of milliseconds to sleep between runs of the idle connection evictor thread. When " + +"non-positive, no idle connection evictor thread will be run.") +.defaultValue("-1") --- End diff -- If the default values are available as constants from DBCP, we should probably use those. If we want to limit "zero or less" to just zero, we can just do an additional Math.max() ---
[GitHub] nifi pull request #3133: NIFI-5790: Exposes 6 commons-dbcp options in DBCPCo...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3133#discussion_r231580669 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java --- @@ -164,6 +164,71 @@ public ValidationResult validate(final String subject, final String input, final .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); +public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder() +.displayName("Minimum Idle Connections") +.name("dbcp-mim-idle-conns") +.description("The minimum number of connections that can remain idle in the pool, without extra ones being " + +"created, or zero to create none.") +.defaultValue("0") +.required(true) +.addValidator(StandardValidators.INTEGER_VALIDATOR) +.build(); + +public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder() +.displayName("Max Idle Connections") +.name("dbcp-max-idle-conns") +.description("The maximum number of connections that can remain idle in the pool, without extra ones being " + +"released, or negative for no limit.") +.defaultValue("8") +.required(true) +.addValidator(StandardValidators.INTEGER_VALIDATOR) +.build(); + +public static final PropertyDescriptor MAX_CONN_LIFETIME = new PropertyDescriptor.Builder() +.displayName("Max Connection Lifetime") +.name("dbcp-max-conn-lifetime") +.description("The maximum lifetime in milliseconds of a connection. After this time is exceeded the " + +"connection will fail the next activation, passivation or validation test. A value of zero or less " + +"means the connection has an infinite lifetime.") +.defaultValue("-1") --- End diff -- Even if the API allows zero or less, we could just say zero means infinite and use a NONNEGATIVE_INTEGER_VALIDATOR ---
[GitHub] nifi pull request #3133: NIFI-5790: Exposes 6 commons-dbcp options in DBCPCo...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3133#discussion_r231580368 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java --- @@ -164,6 +161,71 @@ public ValidationResult validate(final String subject, final String input, final .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); +public static final PropertyDescriptor MIN_IDLE = new PropertyDescriptor.Builder() +.name("Minimum Idle Connections") +.description("The minimum number of connections that can remain idle in the pool, without extra ones being " + +"created, or zero to create none.") +.defaultValue("0") +.required(true) +.addValidator(StandardValidators.INTEGER_VALIDATOR) +.sensitive(false) +.build(); + +public static final PropertyDescriptor MAX_IDLE = new PropertyDescriptor.Builder() +.name("Max Idle Connections") +.description("The maximum number of connections that can remain idle in the pool, without extra ones being " + +"released, or negative for no limit.") +.defaultValue("8") --- End diff -- IMO we should keep the default value as the one used when you don't explicitly set it, I guess that's 8? ---
[GitHub] nifi issue #2287: NIFI-4625 - Added External Version to the PutElastic5 Proc...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/2287 There has been some recent interest in this feature, do you think you'll be able to continue with this? Seems like a good feature to add. ---
[GitHub] nifi pull request #3128: NIFI-5788: Introduce batch size limit in PutDatabas...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3128#discussion_r230812123 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java --- @@ -265,6 +265,17 @@ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); +static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("put-db-record-batch-size") +.displayName("Bulk Size") +.description("Specifies batch size for INSERT and UPDATE statements. This parameter has no effect for other statements specified in 'Statement Type'." ++ " Non-positive value has the effect of infinite bulk size.") +.defaultValue("-1") --- End diff -- What does a value of zero do? Would anyone ever use it? If not, perhaps zero is the best default to indicate infinite bulk size. If you do change it to zero, please change the validator to a NONNEGATIVE_INTEGER_VALIDATOR to match ---
[GitHub] nifi pull request #3128: NIFI-5788: Introduce batch size limit in PutDatabas...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3128#discussion_r230811717 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java --- @@ -265,6 +265,17 @@ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); +static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() --- End diff -- We should be consistent here with "batch size" and "bulk size" in the naming of variables, documentation, etc. Maybe "Maximum Batch Size"? ---
[GitHub] nifi issue #3117: NIFI-5770 Fix Memory Leak in ExecuteScript on Jython
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3117 +1 LGTM, ran unit tests and tried a Jython script in a live NiFi instance. Thanks for the improvement! Merging to master ---
[GitHub] nifi issue #3123: NIFI-5781 - Incorrect schema for provenance events in Site...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3123 +1 LGTM, verified the documentation looks correct, and used the output S2S Prov Reporting Task in various record-based processors with the updated schema. Thanks for the improvement! Merging to master ---
[GitHub] nifi issue #3041: NIFI-5224 Added SolrClientService.
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3041 Mind updating the versions to 1.9.0-SNAPSHOT? Please and thanks! ---
[GitHub] nifi issue #3105: NIFI-5621: Added Cassandra connection provider service
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3105 Mind rebasing this and setting all 1.8.0-SNAPSHOT versions to 1.9.0-SNAPSHOT? Please and thanks! ---
[GitHub] nifi issue #3107: NIFI-5744: Put exception message to attribute while Execut...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3107 Just to connect the dots here, there is some dissent from the PMC about this approach, so we should do our best to reach consensus on the mailing list before this PR is merged, in case the direction changes (switch from an attribute to additional outgoing relationships, e.g.) ---
[GitHub] nifi issue #3020: NIFI-5625: support the variables for the properties of HTT...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3020 Mind rebasing this against the latest master? Please and thanks! ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228325643 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -132,17 +132,25 @@ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); +// Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") -.description("A FlowFile is transferred to this relationship if the operation completed successfully.") +.description("Successfully created FlowFile from CQL query result set.") --- End diff -- This relationship is reused by PutCassandraQL as well, where there is no result set or query per se (it's a statement). That's why the doc is so generic. If you'd like to have the doc be more specific, you can create a REL_SUCCESS relationship in QueryCassandra using `new Relationship.Builder().from(AbstractCassandraProcessor.REL_SUCCESS).description("Your description override").build()` ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228325897 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -33,7 +32,7 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; -import org.apache.commons.text.StringEscapeUtils; +import org.apache.commons.lang3.StringEscapeUtils; --- End diff -- I think we're on the verge of using Apache Commons Text instead of Commons Lang 3, maybe consider keeping this the way it is? ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228332165 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -400,77 +478,87 @@ public static long convertToJsonStream(final ResultSet rs, final OutputStream ou outStream.write("{\"results\":[".getBytes(charset)); final ColumnDefinitions columnDefinitions = rs.getColumnDefinitions(); long nrOfRows = 0; +long rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); + if (columnDefinitions != null) { -do { - -// Grab the ones we have -int rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); -if (rowsAvailableWithoutFetching == 0) { -// Get more -if (timeout <= 0 || timeUnit == null) { -rs.fetchMoreResults().get(); -} else { -rs.fetchMoreResults().get(timeout, timeUnit); -} + +// Grab the ones we have +if (rowsAvailableWithoutFetching == 0) { +// Get more +if (timeout <= 0 || timeUnit == null) { +rs.fetchMoreResults().get(); +} else { +rs.fetchMoreResults().get(timeout, timeUnit); } +rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); +} -for (Row row : rs) { -if (nrOfRows != 0) { +if(maxRowsPerFlowFile == 0){ +maxRowsPerFlowFile = rowsAvailableWithoutFetching; +} +Row row; +while(nrOfRows < maxRowsPerFlowFile){ +try { +row = rs.iterator().next(); +}catch (NoSuchElementException nsee){ +//nrOfRows -= 1; --- End diff -- This is commented out here but active in the Avro version above, I assume they need to be the same? ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228331847 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet = queryFuture.getUninterruptibly(); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAv
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228327507 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet = queryFuture.getUninterruptibly(); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAv
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228328178 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet = queryFuture.getUninterruptibly(); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAv
[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3086 +1 LGTM, tested successfully. Thanks for the fix! Merging to master ---
[GitHub] nifi issue #3107: NIFI-5744: Put exception message to attribute while Execut...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3107 Reviewing... ---
[GitHub] nifi issue #3104: NIFI-5740 Ensuring permissions are restored after test com...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3104 +1 LGTM, verified I couldn't delete the directories manually without this change, and I can afterwards. Thanks for the fix! Merging to master ---
[GitHub] nifi issue #3094: NIFI-5727 - Optimize GenerateTableFetch to remove unnecess...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3094 +1 LGTM, ran regression tests and tried on a NiFi instance. Thanks for the improvement! Merging to master ---
[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3086 Not sure if you want to try another rebase to see if the repo problem is gone (and the realm problem is fixed), or just Ignore the test. I'm fine with either, let me know when you're happy with it (and have tested it even if there's no unit test enabled) and I'll finish the review/merge. Thanks! ---
[GitHub] nifi issue #3092: NIFI-5525 - CSVRecordReader fails with StringIndexOutOfBou...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3092 Pierre made the change Peter recommended (I verified). I think we need it in now as the current Jira is only partially complete for 1.8.0 without it. Peter, I'm going to go ahead and merge this so we can cut the RC, please let me know if you disagree and we can discuss further; otherwise I suspect we're good to go, thanks all! ---
[GitHub] nifi issue #3101: NIFI-5736: Removed 'synchronized' keyword from SwappablePr...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3101 +1 LGTM, ran full build with tests and tried a live NiFi instance. Thanks for the fix! Merging to master ---
[GitHub] nifi pull request #3092: NIFI-5525 - CSVRecordReader fails with StringIndexO...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3092#discussion_r227045025 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java --- @@ -132,6 +131,10 @@ protected final Object convertSimpleIfPossible(final String value, final DataTyp return value; } +private String trim(String value) { +return value.startsWith("\"") && value.endsWith("\"") && (value.length() > 1) ? value.substring(1, value.length() - 1) : value; --- End diff -- Doesn't seem like it would offer a significant performance improvement on average, only if most of the strings to be trimmed were single characters. I'm +1 on this one if you are, I can do the merge (to get this into 1.8.0 RC3) if you like. ---
[GitHub] nifi issue #3079: NIFI-5706 : Added ConvertAvroToParquet processor
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3079 What is the use case for having Parquet files flow through NiFi? I wrote ConvertAvroToORC and every time I saw it used, it was immediately followed by PutHDFS. So I followed the PutParquet lead and wrote a PutORC for the Hive 3 NAR. We can't currently do anything with a Parquet file (or ORC for that matter) in NiFi, so just curious as to how you envision it being used. Also I wonder if a ParquetRecordWriter might be a better idea? It would do the same thing as a processor but the record processors can read in something in any format, not just Avro. This was another impetus for having PutORC instead of ConvertAvroToORC. ---
[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3086 Doesn't look like the tests were run due to a bad artifact. Maybe rebase the PR (squashing just the commits where you tried individual things?) and force push, hopefully it'll go through this time :( Sorry this is such a pain. ---
[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3086 The krb5.conf from the TestRangerNiFiAuthorizer looks like this: ``` [libdefaults] default_realm = EXAMPLE.COM dns_lookup_kdc = false dns_lookup_realm = false [realms] EXAMPLE.COM = { kdc = kerberos.example.com admin_server = kerberos.example.com } ``` And doesn't have the setting of the `java.security.krb5.realm` or `java.security.krb5.kdc` (I assume because they are unnecessary based on the dns_lookup_* properties?). Might be worth a try... ---
[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3086 First one didn't work :( ---
[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3086 Same here, the relevant error in the Travis log is "Cannot locate default realm". Looking at other tests that load krb5.conf, one (TestHBase_1_1_2_ClientService) has this: ``` // needed for calls to UserGroupInformation.setConfiguration() to work when passing in // config with Kerberos authentication enabled System.setProperty("java.security.krb5.realm", "nifi.com"); System.setProperty("java.security.krb5.kdc", "nifi.kdc"); ``` and another (TestRangerNiFiAuthorizer) has this: ``` // have to initialize this system property before anything else File krb5conf = new File("src/test/resources/krb5.conf"); assertTrue(krb5conf.exists()); System.setProperty("java.security.krb5.conf", krb5conf.getAbsolutePath()); ``` Perhaps one or both of these would fix the issue? Not sure how to reproduce, I guess you could try one and push the commit to see if Travis succeeds... ---
[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3086 Looks like the unit test is throwing the wrong exception, IllegalArgumentException instead of InitializationException ---
[GitHub] nifi issue #3081: NIFI-5708 Fixing the creation of ValidationContextFactory ...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3081 +1 LGTM, thanks for the fix! Merging to master ---
[GitHub] nifi pull request #3076: NIFI-5705: Added Hive 3 attribution to nifi-assembl...
GitHub user mattyb149 opened a pull request: https://github.com/apache/nifi/pull/3076 NIFI-5705: Added Hive 3 attribution to nifi-assembly and top-level NOTICEs Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [x] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mattyb149/nifi NIFI-5705 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3076.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3076 commit e4c1fc0d8e6bbe6ca4b07f7556f248567721a268 Author: Matthew Burgess Date: 2018-10-15T23:34:05Z NIFI-5705: Added Hive 3 attribution to nifi-assembly and top-level NOTICEs ---
[GitHub] nifi pull request #3075: NIFI-5604: Added property to allow empty FlowFile w...
GitHub user mattyb149 opened a pull request: https://github.com/apache/nifi/pull/3075 NIFI-5604: Added property to allow empty FlowFile when no SQL generated by GenerateTableFetch Note that this should work with NIFI-5601 (#3074) such that the fragment attributes would be written to the empty flow file as well. Depending on which gets merged first, the other probably needs to be updated. Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [x] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mattyb149/nifi NIFI-5604 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3075.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3075 commit 43bfe9531a0aa2a64179d27b4196a6062f60ebbe Author: Matthew Burgess Date: 2018-10-15T20:27:38Z NIFI-5604: Added property to allow empty FlowFile when no SQL generated by GenerateTableFetch ---
[GitHub] nifi pull request #3074: NIFI-5601: Add fragment.* attributes to GenerateTab...
GitHub user mattyb149 opened a pull request: https://github.com/apache/nifi/pull/3074 NIFI-5601: Add fragment.* attributes to GenerateTableFetch Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [x] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mattyb149/nifi NIFI-5601 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3074.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3074 commit bd6ce6d8c74b65a222fb2a842465e5aae2d40272 Author: Matthew Burgess Date: 2018-10-15T20:07:13Z NIFI-5601: Add fragment.* attributes to GenerateTableFetch ---
[GitHub] nifi issue #3062: NIFI-5686: Updated StandardProcessScheduler so that if it ...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3062 +1 LGTM, ran full build with unit tests, tried on a live instance with reporting tasks and various other flows. Thanks for the improvement! Merging to master ---
[GitHub] nifi pull request #2861: NIFI-5248 Added new Elasticsearch json and record p...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2861#discussion_r225216435 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.jayway.jsonpath.JsonPath; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticSearchError; +import org.apache.nifi.elasticsearch.IndexOperationRequest; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.elasticsearch.put.FlowFileJsonDescription; +import org.apache.nifi.processors.elasticsearch.put.JsonProcessingError; +import org.apache.nifi.util.StringUtils; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put", "index"}) +@CapabilityDescription("This processor puts user-supplied JSON into ElasticSearch. It does not require a schema.") +public class PutElasticsearchJson extends AbstractProcessor implements ElasticSearchRestProcessor { + +static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() +.name("put-es-json-id-attribute") +.displayName("ID Attribute") +.description("The attribute to use for setting the document ID in ElasticSearch. If the payload is an array, " + +"and this option is used for getting the ID, an exception will be raised.") +.required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(Validator.VALID) +.build(); +static final PropertyDescriptor ID_JSON_PATH = new PropertyDescriptor.Builder() +.name("put-es-json-id-json-path") +.displayName("ID JSONPath") +.description("If set, the document ID will be pulled from each json block using this JSONPath operation.") --- End diff -- Nitpick, "json" should be "JSON" everywhere in the doc for consistency ---
[GitHub] nifi pull request #2861: NIFI-5248 Added new Elasticsearch json and record p...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2861#discussion_r225215375 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml --- @@ -107,6 +146,38 @@ language governing permissions and limitations under the License. --> + +org.apache.maven.plugins +maven-compiler-plugin --- End diff -- I think we talked about this before but can't find it. Does it need to be here? I thought we can inherit the one from the top-level parent POM? ---
[GitHub] nifi pull request #2861: NIFI-5248 Added new Elasticsearch json and record p...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2861#discussion_r225215710 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java --- @@ -75,6 +76,19 @@ .required(true) .build(); +Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("All flowfiles that fail for reasons unrelated to server availability go to this relationship.") +.build(); +Relationship REL_RETRY = new Relationship.Builder() +.name("retry") +.description("All flowfiles that fail due to server/cluster availability go to this relationship.") +.build(); +Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("All flowfiles that succeed in being indexed into ElasticSearch go here.") --- End diff -- Maybe "transmitted to ElasticSearch"? Unless "indexed" is a generic enough term that users would know covers upserts, deletes, and any other supported operation. ---
[GitHub] nifi pull request #2861: NIFI-5248 Added new Elasticsearch json and record p...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2861#discussion_r225208197 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java --- @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticSearchError; +import org.apache.nifi.elasticsearch.IndexOperationRequest; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.RecordPathResult; +import org.apache.nifi.record.path.util.RecordPathCache; +import org.apache.nifi.record.path.validation.RecordPathValidator; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put", "index", "record"}) +@CapabilityDescription("A record-aware ElasticSearch put processor that uses the official Elastic REST client libraries.") +public class PutElasticsearchRecord extends AbstractProcessor implements ElasticSearchRestProcessor { +static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() +.name("put-es-record-reader") +.displayName("Record Reader") +.description("The record reader to use for reading incoming records from flowfiles.") +.identifiesControllerService(RecordReaderFactory.class) +.required(true) +.build(); + +static final PropertyDescriptor OPERATION_RECORD_PATH = new PropertyDescriptor.Builder() --- End diff -- If this is specified, does the field remain in the document? What if the user didn't want to have that field included in the document, but needed it in order to specify the operation as a field? What if the index operation were in an attribute versus a record field? Like if you partitioned records based on whether they were upsert vs delete operations (for example). Perhaps consider something like UpdateRecord's "Replacement Value Strategy" property, which allows you to choose whether the Operation field would be evaluated as a Record Path or a Literal Value. Both can support EL, but the latter would allow the setting of a single operation for all the records in the flow file. ---
[GitHub] nifi pull request #2861: NIFI-5248 Added new Elasticsearch json and record p...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2861#discussion_r225208562 --- Diff: nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java --- @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.ElasticSearchError; +import org.apache.nifi.elasticsearch.IndexOperationRequest; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.RecordPathResult; +import org.apache.nifi.record.path.util.RecordPathCache; +import org.apache.nifi.record.path.validation.RecordPathValidator; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put", "index", "record"}) +@CapabilityDescription("A record-aware ElasticSearch put processor that uses the official Elastic REST client libraries.") +public class PutElasticsearchRecord extends AbstractProcessor implements ElasticSearchRestProcessor { +static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() +.name("put-es-record-reader") +.displayName("Record Reader") +.description("The record reader to use for reading incoming records from flowfiles.") +.identifiesControllerService(RecordReaderFactory.class) +.required(true) +.build(); + +static final PropertyDescriptor OPERATION_RECORD_PATH = new PropertyDescriptor.Builder() +.name("put-es-record-operation-path") +.displayName("Operation Record Path") +.description("A record path expression to retrieve index operation setting from each record. If left blank, " + +"all operations will be assumed to be index operations.") +.addValidator(new RecordPathValidator()) +.required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.build(); + +static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder() +.name("put-es-record-id-path") +.displayName("ID Record Path") +.description("A record path expression to retrieve the ID field for use with ElasticSearch.
[GitHub] nifi issue #3057: NIFI-5667: Add nested record support for PutORC
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3057 Updated the rest of the backticks, thanks @VikingK and @bbende for your reviews! ---
[GitHub] nifi issue #3057: NIFI-5667: Add nested record support for PutORC
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3057 I found the issue and pushed a new commit with the fix. Also your test data exercised another part of the code I hadn't thought of, your "as" field is a keyword in Hive so when I used the generated hive.ddl attribute to create the table on top of the ORC files, it didn't work. The other change in this commit is to backtick-quote the field names to protect against field names that are reserved words. ---
[GitHub] nifi pull request #3060: NIFI-5678: Fixed MAP type support of MapRecord obje...
GitHub user mattyb149 opened a pull request: https://github.com/apache/nifi/pull/3060 NIFI-5678: Fixed MAP type support of MapRecord objects in StandardSchemaValidator Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mattyb149/nifi NIFI-5678 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3060.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3060 commit c66de368d07f37a8652c09e4b116b232972821ac Author: Matthew Burgess Date: 2018-10-10T19:01:40Z NIFI-5678: Fixed MAP type support of MapRecord objects in StandardSchemaValidator ---
[GitHub] nifi pull request #3057: NIFI-5667: Add nested record support for PutORC
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3057#discussion_r224126176 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java --- @@ -157,19 +155,17 @@ public String getDefaultCompressionType(final ProcessorInitializationContext con public HDFSRecordWriter createHDFSRecordWriter(final ProcessContext context, final FlowFile flowFile, final Configuration conf, final Path path, final RecordSchema schema) throws IOException, SchemaNotFoundException { -final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema); - final long stripeSize = context.getProperty(STRIPE_SIZE).asDataSize(DataUnit.B).longValue(); final int bufferSize = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final CompressionKind compressionType = CompressionKind.valueOf(context.getProperty(COMPRESSION_TYPE).getValue()); final boolean normalizeForHive = context.getProperty(HIVE_FIELD_NAMES).asBoolean(); -TypeInfo orcSchema = NiFiOrcUtils.getOrcField(avroSchema, normalizeForHive); +TypeInfo orcSchema = NiFiOrcUtils.getOrcSchema(schema, normalizeForHive); final Writer orcWriter = NiFiOrcUtils.createWriter(path, conf, orcSchema, stripeSize, compressionType, bufferSize); final String hiveTableName = context.getProperty(HIVE_TABLE_NAME).isSet() ? context.getProperty(HIVE_TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue() -: NiFiOrcUtils.normalizeHiveTableName(avroSchema.getFullName()); +: NiFiOrcUtils.normalizeHiveTableName(schema.toString());// TODO --- End diff -- Yep that's not the right thing to put there :) Will investigate getting a name from the record somehow, or defaulting to a hardcoded table name if none is provided. ---
[GitHub] nifi issue #3057: NIFI-5667: Add nested record support for PutORC
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3057 Can you share the schema of the data, and possibly a JSON export of your Avro file? I couldn't reproduce this with an array of ints, and @bbende ran successfully with an array of records. ---
[GitHub] nifi pull request #3057: NIFI-5667: Add nested record support for PutORC
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3057#discussion_r223893228 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java --- @@ -157,19 +155,17 @@ public String getDefaultCompressionType(final ProcessorInitializationContext con public HDFSRecordWriter createHDFSRecordWriter(final ProcessContext context, final FlowFile flowFile, final Configuration conf, final Path path, final RecordSchema schema) throws IOException, SchemaNotFoundException { -final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema); - final long stripeSize = context.getProperty(STRIPE_SIZE).asDataSize(DataUnit.B).longValue(); final int bufferSize = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); final CompressionKind compressionType = CompressionKind.valueOf(context.getProperty(COMPRESSION_TYPE).getValue()); final boolean normalizeForHive = context.getProperty(HIVE_FIELD_NAMES).asBoolean(); -TypeInfo orcSchema = NiFiOrcUtils.getOrcField(avroSchema, normalizeForHive); +TypeInfo orcSchema = NiFiOrcUtils.getOrcSchema(schema, normalizeForHive); final Writer orcWriter = NiFiOrcUtils.createWriter(path, conf, orcSchema, stripeSize, compressionType, bufferSize); final String hiveTableName = context.getProperty(HIVE_TABLE_NAME).isSet() ? context.getProperty(HIVE_TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue() -: NiFiOrcUtils.normalizeHiveTableName(avroSchema.getFullName()); +: NiFiOrcUtils.normalizeHiveTableName(schema.toString());// TODO --- End diff -- I admit I hadn't tested this part, the TODO should be removed but we likely need a way to get at the "name" of the top-level record if the Hive Table Name property is not set. Then again, I haven't seen anyone rely on the schema's full name as the table name, the Hive Table Name property is the recommended way to set this for the generated DDL. Welcome all comments though :) ---
[GitHub] nifi pull request #3057: NIFI-5667: Add nested record support for PutORC
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3057#discussion_r223892664 --- Diff: nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java --- @@ -163,19 +161,30 @@ public static Object convertToORCObject(TypeInfo typeInfo, Object o, final boole .mapToObj((element) -> convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), element == 1, hiveFieldNames)) .collect(Collectors.toList()); } -if (o instanceof GenericData.Array) { -GenericData.Array array = ((GenericData.Array) o); -// The type information in this case is interpreted as a List -TypeInfo listTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo(); -return array.stream().map((element) -> convertToORCObject(listTypeInfo, element, hiveFieldNames)).collect(Collectors.toList()); -} if (o instanceof List) { return o; } +if (o instanceof Record) { --- End diff -- This is actually the part that fixes the nested records issue. The rest is that from the Record, we can only get RecordSchema info, where the original util methods required Avro schema/type info. The other changes are a consequence of this, to replace Avro-specific stuff with NiFi Record API stuff. ---
[GitHub] nifi pull request #3057: NIFI-5667: Add nested record support for PutORC
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3057#discussion_r223892405 --- Diff: nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java --- @@ -58,6 +58,10 @@ public void addSchemaField(final String fieldName, final RecordFieldType type, b fields.add(new RecordField(fieldName, type.getDataType(), isNullable)); } +public void addSchemaField(final RecordField recordField) { --- End diff -- This preserves the full data type of the field. RecordFieldType.getDataType(), which is called from the other add() methods, returns the "base" type, such as "ARRAY" instead of "ARRAY[INT]", for equality purposes. ---
[GitHub] nifi pull request #3057: NIFI-5667: Add nested record support for PutORC
GitHub user mattyb149 opened a pull request: https://github.com/apache/nifi/pull/3057 NIFI-5667: Add nested record support for PutORC The basic approach here is that I removed all references to Avro schemas/fields and replaced them with NiFi Record API concepts. This allows us to not have to switch back and forth, since Avro is not the de facto standard for schemas or flow file content (although we are still fairly tightly coupled to Avro schemas, but that's a different issue :) ) I'll comment in the PR on various parts of the code to note the "real" changes to fix the reported issue. ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mattyb149/nifi NIFI-5667 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3057.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3057 commit ab16d6f77c7dead859643492fe650685ffd7e4ba Author: Matthew Burgess Date: 2018-10-09T22:59:00Z NIFI-5667: Add nested record support for PutORC ---
[GitHub] nifi issue #2966: NIFI-5552 - Add option to normalize header column names in...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/2966 I'm a little leery of automatically normalizing the names. In [NIFI-4612](https://issues.apache.org/jira/browse/NIFI-4612) we added the ability to disable name validation for the AvroSchemaRegistry, so there could be Avro-illegal field names but they'd be preserved. I think that came up for things like PutDatabaseRecord where the field names needed to be preserved but were not Avro-valid. If we automatically normalize them, there might be unintended (and undesired) consequence to things like PutParquet and PutDatabaseRecord. Perhaps instead of (or in addition to) normalization, we provide the option to "disable schema validation". This isn't technically Avro-specific, but would be used while we are so coupled to Avro schemas in the meantime? ---
[GitHub] nifi issue #2997: NIFI-5583: Add cdc processor for MySQL referring to GTID.
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/2997 Do we need a separate processor for this, or can we just add GTID support to the existing one (perhaps with a property such as "Include GTID" or whatever, defaulting to false but if set to true, executes the GTID-related code)? ---
[GitHub] nifi issue #3025: NIFI-5605 Added UpdateCassandra.
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3025 Can PutCassandraRecord also be improved to allow inserts, updates, deletes? I'm thinking PutCassandraRecord could be like PutDatabaseRecord. Doesn't PutCassandraQL already support updates since the user provides the CQL to execute? ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r223701761 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -192,15 +205,17 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile fileToProcess = null; +FlowFile inputFlowFile = null; if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } +session.remove(inputFlowFile); --- End diff -- I don't follow the logic here. If there is a flow file in the incoming connection, this appears to remove it from the session, and fileToProcess will always be null, which means we couldn't use flow file attributes to evaluate properties such as CQL Query, Query Timeout, Charset, etc. Another effect is that provenance/lineage will not be preserved for incoming files, as the incoming file will be removed, and any flow files generated by this processor will appear to have been created here, so you can't track that a flow file "A" came in and, as a result, generated flow files X,Y,Z. ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r223700156 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -132,6 +131,20 @@ private final static List propertyDescriptors; +// Relationships +public static final Relationship REL_SUCCESS = new Relationship.Builder() --- End diff -- These relationships are available (albeit with more generic descriptions) in the AbstractCassandraProcessor class. If we need more specific documentation then we should remove the common relationships and provide specific documentation/relationships in all Cassandra processors. ---
[GitHub] nifi issue #2974: NIFI-5533: Be more efficient with heap utilization
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/2974 +1 LGTM, ran full build with unit tests and tried many different scenarios with record processors, attributes, etc. Thanks for the improvements! Merging to master ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r222757166 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() +.name("neo4j-password") +.displayName("Password") +.description("Password for Neo4J user") +.required(true) +.sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round Robin Strategy"); + +public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r222753864 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/Neo4JCypherExecutor.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.ByteArrayInputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.summary.ResultSummary; +import org.neo4j.driver.v1.summary.SummaryCounters; + +import com.fasterxml.jackson.databind.ObjectMapper; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching --- End diff -- Since this processor can delete data from the target, I'm not sure SupportsBatching is appropriate here. Then again, I don't know enough about the nuances, perhaps @markap14 can chime in. ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r222758854 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() --- End diff -- For user-friendliness, couldn't we remove the onus from the user and just pick a dummy non-blank password if none was supplied in the property? Then it wouldn't need to be a required property. ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r222758485 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") --- End diff -- The Query is in Cypher, my preference is for this property to refer to Cypher rather than Neo4J. See my other comments about making this processor more generic to support more openCypher platforms. Also, would Cypher's syntax cause issues when evaluating Expression Language? It doesn't look like you're using parameters so I wouldn't expect something like an "UNWIND $myList" or anything, but just in case it might be a good idea to mention any caveats, limitations, etc. in the description of this property. ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r222756058 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() +.name("neo4j-password") +.displayName("Password") +.description("Password for Neo4J user") +.required(true) +.sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round Robin Strategy"); + +public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.name(), "Least Connected", "Least Connect
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r222756361 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() +.name("neo4j-password") +.displayName("Password") +.description("Password for Neo4J user. A dummy non-blank password is required even if it disabled on the server.") +.required(true) +.sensitive(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round Robin Strategy"); + +public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r222760950 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) --- End diff -- What if the Cypher query was the content of the incoming flow file? ExecuteSQL makes this property not required, and if left empty, it assumes that the query is in the flow file content and uses that. I've seen some pretty large Cypher queries, in this case I'd need an ExtractText to get the whole thing into an attribute just so I can pass it back to this property via Expression Language. It's not a total deal-breaker for me, just wanted to suggest/recommend it for user-friendliness. I know for ExecuteSQL that property is very often left blank, as something upstream generates the SQL as the body of the flow file. ---
[GitHub] nifi issue #2945: NIFI-4517: Added ExecuteSQLRecord and QueryDatabaseTableRe...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/2945 @pvillard31 @MikeThomsen Think we can get the final review done in time for 1.8.0? Folks have been asking for these processors for a long time :) ---
[GitHub] nifi issue #3025: NIFI-5605 Added UpdateCassandra.
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3025 Is there a reason we can't have delete/update capabilities in PutCassandraQL and/or PutCassandraRecord? Wondering why the need for a separate processor. If it does need to be separate, maybe UpdateCassandraRecord instead or in addition to? ---
[GitHub] nifi issue #3015: NIFI-5254 [WIP] Updated to Groovy 2.5.2
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3015 Unfortunately I believe you need to enumerate all the transitive dependencies in the L&N. You should have been able to copy that from the Groovy NOTICE, but it does not have its own dependencies listed in there (assuming some of the dependencies have NOTICEs), so we have to do it here. Their LICENSE mentions a couple of dependencies, but I'm not sure it's complete, so you'll have to verify both our L&N include the appropriate entries. ---
[GitHub] nifi issue #2861: NIFI-5248 Added new Elasticsearch json and record processo...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/2861 Mind rebasing this one? I think you were waiting on the ES LookupService PR anyway, and now it's in :) ---
[GitHub] nifi issue #3042: NIFI-5650: Added Xerces to scripting bundle for Jython 2.7...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3042 Thanks! Might need to update your master before pushing, I brought in your ES Lookup Service PR (finally :P) ---