[GitHub] nifi issue #2794: NIFI-4579: Fix ValidateRecord type coercing

2018-12-13 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/2794
  
+1 LGTM, verified the unit test illustrates the behavior and the patch 
fixes it. Thanks for the fix! Merging to master


---


[GitHub] nifi pull request #3217: NIFI-3988: Add fragment attributes to SplitRecord

2018-12-13 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/3217

NIFI-3988: Add fragment attributes to SplitRecord

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-3988

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3217.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3217


commit 1b95cd85fe623e6f981340dbfdcdc3c1034dbd5d
Author: Matthew Burgess 
Date:   2018-12-13T19:02:14Z

NIFI-3988: Add fragment attributes to SplitRecord




---


[GitHub] nifi issue #3216: NIFI-5891 fix handling of null logical types in Hive3Strea...

2018-12-13 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3216
  
+1 LGTM, verified the unit tests illustrate the problem and the patch fixes 
it. There are 2 CheckStyle violations but I will fix them on merge. Thanks for 
the fix! Merging to master


---


[GitHub] nifi pull request #2521: NIFI-4946 nifi-spark-bundle : Adding support for py...

2018-12-05 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2521#discussion_r239241291
  
--- Diff: 
nifi-nar-bundles/nifi-spark-bundle/nifi-livy-processors/src/main/java/org/apache/nifi/processors/livy/ExecuteSparkInteractive.java
 ---
@@ -178,48 +247,188 @@ public void onTrigger(ProcessContext context, final 
ProcessSession session) thro
 
 String sessionId = livyController.get("sessionId");
 String livyUrl = livyController.get("livyUrl");
-String code = 
context.getProperty(CODE).evaluateAttributeExpressions(flowFile).getValue();
-if (StringUtils.isEmpty(code)) {
-try (InputStream inputStream = session.read(flowFile)) {
-// If no code was provided, assume it is in the content of 
the incoming flow file
-code = IOUtils.toString(inputStream, charset);
-} catch (IOException ioe) {
-log.error("Error reading input flowfile, penalizing and 
routing to failure", new Object[]{flowFile, ioe.getMessage()}, ioe);
-flowFile = session.penalize(flowFile);
-session.transfer(flowFile, REL_FAILURE);
-return;
-}
-}
 
-code = StringEscapeUtils.escapeJavaScript(code);
-String payload = "{\"code\":\"" + code + "\"}";
+
 try {
-final JSONObject result = submitAndHandleJob(livyUrl, 
livySessionService, sessionId, payload, statusCheckInterval);
-log.debug("ExecuteSparkInteractive Result of Job Submit: " + 
result);
-if (result == null) {
-session.transfer(flowFile, REL_FAILURE);
-} else {
+
+if (isBatchJob) {
+
+String jsonResponse = null;
+
+if (StringUtils.isEmpty(jsonResponse)) {
+try (InputStream inputStream = session.read(flowFile)) 
{
+// If no code was provided, assume it is in the 
content of the incoming flow file
+jsonResponse = IOUtils.toString(inputStream, 
charset);
+} catch (IOException ioe) {
+log.error("Error reading input flowfile, 
penalizing and routing to failure", new Object[]{flowFile, ioe.getMessage()}, 
ioe);
+flowFile = session.penalize(flowFile);
+session.transfer(flowFile, REL_FAILURE);
+return;
+}
+}
+
+log.debug(" > jsonResponse: " + jsonResponse);
+
 try {
-final JSONObject output = result.getJSONObject("data");
-flowFile = session.write(flowFile, out -> 
out.write(output.toString().getBytes()));
-flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-session.transfer(flowFile, REL_SUCCESS);
-} catch (JSONException je) {
-// The result doesn't contain the data, just send the 
output object as the flow file content to failure (after penalizing)
-log.error("Spark Session returned an error, sending 
the output JSON object as the flow file content to failure (after penalizing)");
-flowFile = session.write(flowFile, out -> 
out.write(result.toString().getBytes()));
+
+final JSONObject jsonResponseObj = new 
JSONObject(jsonResponse);
+
+Map headers = new HashMap<>();
+headers.put("Content-Type", 
LivySessionService.APPLICATION_JSON);
+headers.put("X-Requested-By", LivySessionService.USER);
+headers.put("Accept", "application/json");
+
+JSONObject jobInfo = 
readJSONObjectFromUrl(jsonResponseObj.getString("url"), livySessionService, 
headers);
+
+flowFile = session.write(flowFile, out -> 
out.write(jobInfo.toString().getBytes()));
 flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), LivySessionService.APPLICATION_JSON);
-flowFile = session.penalize(flowFile);
+
+Thread.sleep(statusCheckInterval);
+
+String state  = jobInfo.getString("state");
+log.debug(" > jsonResponseObj State: " + state);
+
+   

[GitHub] nifi pull request #3191: NIFI-5855: Remove unnecessary ORDER BY clause in Ge...

2018-11-29 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/3191

NIFI-5855: Remove unnecessary ORDER BY clause in GenerateTableFetch when 
Partition Size is zero

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-5855

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3191.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3191


commit 78a3c9badf4724691022081413bbb0a6de2f960e
Author: Matthew Burgess 
Date:   2018-11-29T22:46:24Z

NIFI-5855: Remove unnecessary ORDER BY clause in GenerateTableFetch when 
Partition Size is zero




---


[GitHub] nifi pull request #3188: NIFI-5829 Create Lookup Controller Services for Rec...

2018-11-28 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3188#discussion_r237218520
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/ReaderLookup.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"lookup", "parse", "record", "row", "reader"})
+@SeeAlso({RecordSetWriterLookup.class})
+@CapabilityDescription("Provides a RecordReaderFactory that can be used to 
dynamically select another RecordReaderFactory. This service " +
+"requires an attribute named 'recordreader.name' to be passed in 
when asking for a record record, and will throw an exception " +
+"if the attribute is missing. The value of 'recordreader.name' 
will be used to select the RecordReaderFactory that has been " +
+"registered with that name. This will allow multiple 
RecordReaderFactory's to be defined and registered, and then selected " +
+"dynamically at runtime by tagging flow files with the appropriate 
'recordreader.name' attribute.")
+@DynamicProperty(name = "The ", value = "RecordWriterFactory property 
value", expressionLanguageScope = ExpressionLanguageScope.NONE,
+description = "")
+public class ReaderLookup extends AbstractControllerService implements 
RecordReaderFactory {
--- End diff --

Now that controller services can have access to attributes, it would be 
nice to add support in the documentation for `ReadsAttributes` annotations. You 
don't need to do that here, just leaving it as a reminder for a future 
improvement Jira :)


---


[GitHub] nifi pull request #3188: NIFI-5829 Create Lookup Controller Services for Rec...

2018-11-28 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3188#discussion_r237218092
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/lookup/ReaderLookup.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"lookup", "parse", "record", "row", "reader"})
+@SeeAlso({RecordSetWriterLookup.class})
+@CapabilityDescription("Provides a RecordReaderFactory that can be used to 
dynamically select another RecordReaderFactory. This service " +
+"requires an attribute named 'recordreader.name' to be passed in 
when asking for a record record, and will throw an exception " +
+"if the attribute is missing. The value of 'recordreader.name' 
will be used to select the RecordReaderFactory that has been " +
+"registered with that name. This will allow multiple 
RecordReaderFactory's to be defined and registered, and then selected " +
+"dynamically at runtime by tagging flow files with the appropriate 
'recordreader.name' attribute.")
+@DynamicProperty(name = "The ", value = "RecordWriterFactory property 
value", expressionLanguageScope = ExpressionLanguageScope.NONE,
--- End diff --

I realize this was copy-pasted from DBCPConnectionPoolLookup, but I think 
it makes the documentation choppy, IMO it would be better to have the name be 
something like `` and the value something like `A 
RecordReader controller service`.

Also this is ReaderLookup but the current value references 
RecordWriterFactory


---


[GitHub] nifi issue #3186: NIFI-5843 added subjects to the error message to make it c...

2018-11-28 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3186
  
+1 LGTM, thanks for the improvement! Merging to master


---


[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-11-28 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r237138338
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 ---
@@ -132,17 +134,25 @@
 .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
 .build();
 
+// Relationships
 public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
 .name("success")
 .description("A FlowFile is transferred to this relationship 
if the operation completed successfully.")
 .build();
+
+static final Relationship REL_ORIGINAL = new Relationship.Builder()
+.name("original")
+.description("All input FlowFiles that are part of a 
successful query execution go here.")
+.build();
+
 public static final Relationship REL_FAILURE = new 
Relationship.Builder()
 .name("failure")
-.description("A FlowFile is transferred to this relationship 
if the operation failed.")
+.description("CQL query execution failed.")
--- End diff --

I'd prefer `CQL operation` rather than `query`, as PutCassandra is more of 
a statement than a query, and this base processor could be used to do other 
things. Technically a processor wouldn't have to use CQL although I'm not sure 
of other ways to interact with Cassandra, and I think the user would be 
comfortable with the term CQL to mean Cassandra, even if it didn't use CQL.


---


[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-11-28 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r237138928
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 ---
@@ -132,17 +134,25 @@
 .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
 .build();
 
+// Relationships
 public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
 .name("success")
 .description("A FlowFile is transferred to this relationship 
if the operation completed successfully.")
 .build();
+
+static final Relationship REL_ORIGINAL = new Relationship.Builder()
+.name("original")
+.description("All input FlowFiles that are part of a 
successful query execution go here.")
--- End diff --

Maybe `CQL operation` here instead of `query execution`, see comments below


---


[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-11-28 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r237141917
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = queryFuture.getUninterruptibly();
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAv

[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-11-28 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r237152109
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -269,11 +313,17 @@ public void process(final OutputStream out) throws 
IOException {
 // cap the error limit at 10, format the messages, and don't 
include the stack trace (it is displayed by the
 // logger message above).
 getLogger().error(nhae.getCustomMessage(10, true, false));
+if (fileToProcess == null) {
+fileToProcess = session.create();
+}
 fileToProcess = session.penalize(fileToProcess);
 session.transfer(fileToProcess, REL_RETRY);
-
 } catch (final QueryExecutionException qee) {
+//session.rollback();
--- End diff --

There are a few spots of commented out code, please remove if not 
necessary, thanks!


---


[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-11-28 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r237138678
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 ---
@@ -132,17 +134,25 @@
 .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
 .build();
 
+// Relationships
 public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
 .name("success")
 .description("A FlowFile is transferred to this relationship 
if the operation completed successfully.")
 .build();
+
+static final Relationship REL_ORIGINAL = new Relationship.Builder()
+.name("original")
+.description("All input FlowFiles that are part of a 
successful query execution go here.")
+.build();
+
 public static final Relationship REL_FAILURE = new 
Relationship.Builder()
 .name("failure")
-.description("A FlowFile is transferred to this relationship 
if the operation failed.")
+.description("CQL query execution failed.")
 .build();
+
 public static final Relationship REL_RETRY = new 
Relationship.Builder().name("retry")
-.description("A FlowFile is transferred to this relationship 
if the operation cannot be completed but attempting "
-+ "it again may succeed.")
+.description("A FlowFile is transferred to this relationship 
if the query cannot be completed but attempting "
--- End diff --

I tend to like `operation` here (see above), and I like that you replaced 
`it` with `operation` for clarity.


---


[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-11-28 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r237147586
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = queryFuture.getUninterruptibly();
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAv

[GitHub] nifi pull request #3184: NIFI-5845: Add support for OTHER and SQLXML JDBC ty...

2018-11-28 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3184#discussion_r237101820
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/util/hive/HiveJdbcCommon.java
 ---
@@ -402,6 +409,9 @@ public static long convertToCsvStream(final ResultSet 
rs, final OutputStream out
 rowValues.add("");
 }
 break;
+case SQLXML:
+
rowValues.add(StringEscapeUtils.escapeCsv(((java.sql.SQLXML) 
value).getString()));
--- End diff --

Yep missed that (nulls are handled earlier in the Avro code, but 
individually in the CSV code), will add both, good catch! I had tested with 
nulls but only for Avro output.


---


[GitHub] nifi pull request #3184: NIFI-5845: Add support for OTHER and SQLXML JDBC ty...

2018-11-27 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/3184

NIFI-5845: Add support for OTHER and SQLXML JDBC types to SQL/Hive 
processors

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-5845

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3184


commit a8009f9a0010b99703713a38263006a7a9e5f5d5
Author: Matthew Burgess 
Date:   2018-11-27T23:35:29Z

NIFI-5845: Add support for OTHER and SQLXML JDBC types to SQL/Hive 
processors




---


[GitHub] nifi pull request #3179: NIFI-5834: Restore default PutHiveQL error handling...

2018-11-26 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3179#discussion_r236410289
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
 ---
@@ -148,7 +148,23 @@ public void constructProcess() {
 if (e instanceof SQLNonTransientException) {
 return ErrorTypes.InvalidInput;
 } else if (e instanceof SQLException) {
-return ErrorTypes.TemporalFailure;
+// Use the SQLException's vendor code for guidance -- see 
Hive's ErrorMsg class for details on error codes
+int errorCode = ((SQLException) e).getErrorCode();
--- End diff --

I'll add the debug logging for now, we can add errorCode possibly to an 
attribute (as was just done for some SQL processor(s)) under a separate Jira?


---


[GitHub] nifi pull request #3179: NIFI-5834: Restore default PutHiveQL error handling...

2018-11-26 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3179#discussion_r236410267
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3QL.java
 ---
@@ -148,7 +148,23 @@ public void constructProcess() {
 if (e instanceof SQLNonTransientException) {
 return ErrorTypes.InvalidInput;
 } else if (e instanceof SQLException) {
-return ErrorTypes.TemporalFailure;
+// Use the SQLException's vendor code for guidance -- see 
Hive's ErrorMsg class for details on error codes
+int errorCode = ((SQLException) e).getErrorCode();
+if (errorCode >= 1 && errorCode < 2) {
+return ErrorTypes.InvalidInput;
+} else if (errorCode >= 2 && errorCode < 3) {
+return ErrorTypes.TemporalFailure;
--- End diff --

Good point, will change.


---


[GitHub] nifi issue #3167: NIFI-5812: Marked Database processors as 'PrimaryNodeOnly'

2018-11-21 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3167
  
Removing the PrimaryNodeOnly annotation from GTF causes a Checkstyle 
violation due to the unused import. Can you remove the import statement as 
well? Wouldn't hurt to rebase (not merge) as well. Please and thanks!


---


[GitHub] nifi pull request #3179: NIFI-5834: Restore default PutHiveQL error handling...

2018-11-20 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/3179

NIFI-5834: Restore default PutHiveQL error handling behavior

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-5834

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3179.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3179


commit 3e1b98b74476bc534f7039580cfde25bd733e773
Author: Matthew Burgess 
Date:   2018-11-20T22:58:59Z

NIFI-5834: Restore default PutHiveQL error handling behavior




---


[GitHub] nifi issue #3177: NIFI-5828: Documents behavior of ExecuteSQL attrs when Max...

2018-11-19 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3177
  
@colindean mind closing this PR? Doesn't look like the comment in the 
commit caused it to auto-close (maybe the period after the PR number?) Please 
and thanks! Also I updated the Fix Version to be 1.9.0 as 1.8.0 was already 
released


---


[GitHub] nifi issue #3128: NIFI-5788: Introduce batch size limit in PutDatabaseRecord...

2018-11-15 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3128
  
+1 LGTM, tested with various batch sizes and ran unit tests. Thanks for 
this improvment! Merged to master


---


[GitHub] nifi pull request #3075: NIFI-5604: Added property to allow empty FlowFile w...

2018-11-14 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3075#discussion_r233598457
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 ---
@@ -450,7 +450,33 @@ public void onTrigger(final ProcessContext context, 
final ProcessSessionFactory
 
 // If there are no SQL statements to be generated, still 
output an empty flow file if specified by the user
 if (numberOfFetches == 0 && 
outputEmptyFlowFileOnZeroResults) {
-session.transfer((fileToProcess == null) ? 
session.create() : session.create(fileToProcess), REL_SUCCESS);
+FlowFile emptyFlowFile = (fileToProcess == null) ? 
session.create() : session.create(fileToProcess);
+Map attributesToAdd = new HashMap<>();
+
+attributesToAdd.put("generatetablefetch.tableName", 
tableName);
+if (columnNames != null) {
+
attributesToAdd.put("generatetablefetch.columnNames", columnNames);
+}
+whereClause = maxValueClauses.isEmpty() ? "1=1" : 
StringUtils.join(maxValueClauses, " AND ");
+if (StringUtils.isNotBlank(whereClause)) {
--- End diff --

Yeah that's probably left over from before we added the "1=1", I'll take a 
look and clean it up


---


[GitHub] nifi pull request #3170: NIFI-5652: Fixed LogMessage when logging level is d...

2018-11-14 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/3170

NIFI-5652: Fixed LogMessage when logging level is disabled

I couldn't write a unit test as I can't change the log level in between 
unit tests. I reproduced the issue and verified it is no longer present with a 
live NiFi instance running with this patch.

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-5652

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3170


commit 6c08aea9a7394237d5e575086fe965001e3700d7
Author: Matthew Burgess 
Date:   2018-11-14T15:50:37Z

NIFI-5652: Fixed LogMessage when logging level is disabled




---


[GitHub] nifi issue #1953: NIFI-4130 Add lookup controller service in TransformXML to...

2018-11-14 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/1953
  
+1 LGTM, thanks for the review @bdesert and the improvement @pvillard31 ! 
Merging to master


---


[GitHub] nifi pull request #3075: NIFI-5604: Added property to allow empty FlowFile w...

2018-11-13 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3075#discussion_r233142376
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 ---
@@ -434,48 +448,53 @@ public void onTrigger(final ProcessContext context, 
final ProcessSessionFactory
 numberOfFetches = (partitionSize == 0) ? 1 : (rowCount 
/ partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
 }
 
-// Generate SQL statements to read "pages" of data
-Long limit = partitionSize == 0 ? null : (long) 
partitionSize;
-final String fragmentIdentifier = 
UUID.randomUUID().toString();
-for (long i = 0; i < numberOfFetches; i++) {
-// Add a right bounding for the partitioning column if 
necessary (only on last partition, meaning we don't need the limit)
-if ((i == numberOfFetches - 1) && 
useColumnValsForPaging && (maxValueClauses.isEmpty() || customWhereClause != 
null)) {
-maxValueClauses.add(columnForPartitioning + " <= " 
+ maxValueForPartitioning);
-limit = null;
-}
+// If there are no SQL statements to be generated, still 
output an empty flow file if specified by the user
+if (numberOfFetches == 0 && 
outputEmptyFlowFileOnZeroResults) {
+session.transfer((fileToProcess == null) ? 
session.create() : session.create(fileToProcess), REL_SUCCESS);
--- End diff --

Good call, will make the change.


---


[GitHub] nifi issue #3130: NIFI-5791: Add Apache Daffodil (incubating) bundle

2018-11-12 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3130
  
This is interesting stuff, I need to read more up on DFDL, might make a 
good schema definition language as an alternative to Avro schemas. Also at 
first glance it seems the Daffodil infoset is analogous to a NiFi Record for 
data, and/or a NiFi RecordSchema for its schema. Having an infoset as the 
underlying data model seems to overlap a bit with how the NiFi Record stuff 
works, for example we could have a PCAPReader and any writer such as 
JSONWriter, then the ConvertRecord processor can convert PCAP to JSON. I think 
someone was working on a PCAP reader, not sure how they handled the global 
stuff. If Avro schemas are not sufficient to capture this kind of model, then 
we should definitely look at DFDL as an alternative (we've been considering XSD 
as a more expressive alternative).

Quick note about JOLT, the "normal" use case is for JSON-to-JSON 
conversion, but the underlying library does the transformation on a POJO, not 
JSON. So the usual entry point for JOLT is to give it a JSON string, it 
converts to a POJO, runs the transformation, and takes the resulting POJO and 
writes it out as JSON. In NiFi I wrote JoltTransformRecord, which instead of 
explicit JSON uses the NiFi Record API, so it gets each record (in whichever 
format) as a NiFi Record, translates it to a POJO, then runs the JOLT 
transforms, then translates back into a NiFi Record, which is written out in 
whatever format is specified by the RecordWriter.


---


[GitHub] nifi issue #3167: NIFI-5812: Marked Database processors as 'PrimaryNodeOnly'

2018-11-12 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3167
  
I did something like that for the Hive 1 version of PutHiveStreaming, the 
underlying library wasn't thread-safe if you were working on the same table, so 
I put in a "table lock" where multiple threads couldn't act on the same table. 
With QDT it doesn't take incoming flow files, so the table name is effectively 
hard-coded. By adding PrimaryNodeOnly we can guarantee that QDT only has one 
instance (it is already TriggerSerially so can't have multiple threads). For 
GTF if you use ListDatabaseTables on the primary node only (not sure if we 
should force that with this annotation or not) then each flow file should have 
a different table, and using a load-balanced connection (or RPG -> Input Port) 
then each instance of GTF should be working on a different table. The onus is 
on the user to set Max Concurrent Tasks for GTF to 1 to prevent multi-threaded 
execution.

Perhaps for GTF, instead of forcing PrimaryNodeOnly, we can make it clear 
in the doc that if there are no incoming connections, it should probably be run 
on the primary node only. Alternatively, maybe during annotation processing we 
can enforce that processors annotated with PrimaryNodeOnly automatically have 
an InputRequirement of INPUT_FORBIDDEN? Otherwise flow files can get stalled if 
they are in the queue on a node that is not the primary.


---


[GitHub] nifi pull request #3167: NIFI-5812: Marked Database processors as 'PrimaryNo...

2018-11-12 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3167#discussion_r232697514
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 ---
@@ -114,6 +114,7 @@
 + "max value for max value columns. Properties should be added in 
the format `initial.maxvalue.`. This value is only used the 
first time "
 + "the table is accessed (when a Maximum Value Column is 
specified). In the case of incoming connections, the value is only used the 
first time for each table "
 + "specified in the flow files.")
+@PrimaryNodeOnly
--- End diff --

Since GenerateTableFetch can accept incoming flow files, I don't think we 
should restrict this to run only on the primary node. For example, you could do 
a ListDatabaseTables, then distribute the flow files among the cluster, where 
each node's downstream GTF could do the fetch in parallel. In fact that's the 
main reason we have GTF rather than just QueryDatabaseTable.


---


[GitHub] nifi pull request #3133: NIFI-5790: Exposes 6 commons-dbcp options in DBCPCo...

2018-11-08 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3133#discussion_r232011596
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
 ---
@@ -164,6 +166,77 @@ public ValidationResult validate(final String subject, 
final String input, final
 
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
 .build();
 
+public static final PropertyDescriptor MIN_IDLE = new 
PropertyDescriptor.Builder()
+.displayName("Minimum Idle Connections")
+.name("dbcp-mim-idle-conns")
+.description("The minimum number of connections that can 
remain idle in the pool, without extra ones being " +
+"created, or zero to create none.")
+
.defaultValue(String.valueOf(GenericObjectPoolConfig.DEFAULT_MIN_IDLE))
--- End diff --

I didn't realize the constants were only available from a class in an 
"impl" package in DBCP. In that case I take back what I said :(  Can we create 
our own constants here with a comment that we got the values from DBCP itself?


---


[GitHub] nifi pull request #3133: NIFI-5790: Exposes 6 commons-dbcp options in DBCPCo...

2018-11-07 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3133#discussion_r231582876
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
 ---
@@ -164,6 +164,71 @@ public ValidationResult validate(final String subject, 
final String input, final
 
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
 .build();
 
+public static final PropertyDescriptor MIN_IDLE = new 
PropertyDescriptor.Builder()
+.displayName("Minimum Idle Connections")
+.name("dbcp-mim-idle-conns")
+.description("The minimum number of connections that can 
remain idle in the pool, without extra ones being " +
+"created, or zero to create none.")
+.defaultValue("0")
+.required(true)
+.addValidator(StandardValidators.INTEGER_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MAX_IDLE = new 
PropertyDescriptor.Builder()
+.displayName("Max Idle Connections")
+.name("dbcp-max-idle-conns")
+.description("The maximum number of connections that can 
remain idle in the pool, without extra ones being " +
+"released, or negative for no limit.")
+.defaultValue("8")
+.required(true)
+.addValidator(StandardValidators.INTEGER_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MAX_CONN_LIFETIME = new 
PropertyDescriptor.Builder()
+.displayName("Max Connection Lifetime")
+.name("dbcp-max-conn-lifetime")
+.description("The maximum lifetime in milliseconds of a 
connection. After this time is exceeded the " +
+"connection will fail the next activation, passivation 
or validation test. A value of zero or less " +
+"means the connection has an infinite lifetime.")
+.defaultValue("-1")
+.required(true)
+.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor EVICTION_RUN_PERIOD = new 
PropertyDescriptor.Builder()
+.displayName("Time Between Eviction Runs")
+.name("dbcp-time-between-eviction-runs")
+.description("The number of milliseconds to sleep between runs 
of the idle connection evictor thread. When " +
+"non-positive, no idle connection evictor thread will 
be run.")
+.defaultValue("-1")
+.required(true)
+.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MIN_EVICTABLE_IDLE_TIME = new 
PropertyDescriptor.Builder()
+.displayName("Minimum Evictable Idle Time")
+.name("dbcp-min-evictable-idle-time")
+.description("The minimum amount of time a connection may sit 
idle in the pool before it is eligible for eviction.")
+.defaultValue("1800 secs")
+.required(true)
+.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
--- End diff --

The validator supports expression language, so I think these properties 
should support expression language at the Variable Registry level


---


[GitHub] nifi pull request #3133: NIFI-5790: Exposes 6 commons-dbcp options in DBCPCo...

2018-11-07 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3133#discussion_r231582349
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
 ---
@@ -164,6 +164,71 @@ public ValidationResult validate(final String subject, 
final String input, final
 
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
 .build();
 
+public static final PropertyDescriptor MIN_IDLE = new 
PropertyDescriptor.Builder()
+.displayName("Minimum Idle Connections")
+.name("dbcp-mim-idle-conns")
+.description("The minimum number of connections that can 
remain idle in the pool, without extra ones being " +
+"created, or zero to create none.")
+.defaultValue("0")
+.required(true)
+.addValidator(StandardValidators.INTEGER_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MAX_IDLE = new 
PropertyDescriptor.Builder()
+.displayName("Max Idle Connections")
+.name("dbcp-max-idle-conns")
+.description("The maximum number of connections that can 
remain idle in the pool, without extra ones being " +
+"released, or negative for no limit.")
+.defaultValue("8")
+.required(true)
+.addValidator(StandardValidators.INTEGER_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MAX_CONN_LIFETIME = new 
PropertyDescriptor.Builder()
+.displayName("Max Connection Lifetime")
+.name("dbcp-max-conn-lifetime")
+.description("The maximum lifetime in milliseconds of a 
connection. After this time is exceeded the " +
+"connection will fail the next activation, passivation 
or validation test. A value of zero or less " +
+"means the connection has an infinite lifetime.")
+.defaultValue("-1")
+.required(true)
+.addValidator(CUSTOM_TIME_PERIOD_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor EVICTION_RUN_PERIOD = new 
PropertyDescriptor.Builder()
+.displayName("Time Between Eviction Runs")
+.name("dbcp-time-between-eviction-runs")
+.description("The number of milliseconds to sleep between runs 
of the idle connection evictor thread. When " +
+"non-positive, no idle connection evictor thread will 
be run.")
+.defaultValue("-1")
--- End diff --

If the default values are available as constants from DBCP, we should 
probably use those. If we want to limit "zero or less" to just zero, we can 
just do an additional Math.max()


---


[GitHub] nifi pull request #3133: NIFI-5790: Exposes 6 commons-dbcp options in DBCPCo...

2018-11-07 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3133#discussion_r231580669
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
 ---
@@ -164,6 +164,71 @@ public ValidationResult validate(final String subject, 
final String input, final
 
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
 .build();
 
+public static final PropertyDescriptor MIN_IDLE = new 
PropertyDescriptor.Builder()
+.displayName("Minimum Idle Connections")
+.name("dbcp-mim-idle-conns")
+.description("The minimum number of connections that can 
remain idle in the pool, without extra ones being " +
+"created, or zero to create none.")
+.defaultValue("0")
+.required(true)
+.addValidator(StandardValidators.INTEGER_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MAX_IDLE = new 
PropertyDescriptor.Builder()
+.displayName("Max Idle Connections")
+.name("dbcp-max-idle-conns")
+.description("The maximum number of connections that can 
remain idle in the pool, without extra ones being " +
+"released, or negative for no limit.")
+.defaultValue("8")
+.required(true)
+.addValidator(StandardValidators.INTEGER_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MAX_CONN_LIFETIME = new 
PropertyDescriptor.Builder()
+.displayName("Max Connection Lifetime")
+.name("dbcp-max-conn-lifetime")
+.description("The maximum lifetime in milliseconds of a 
connection. After this time is exceeded the " +
+"connection will fail the next activation, passivation 
or validation test. A value of zero or less " +
+"means the connection has an infinite lifetime.")
+.defaultValue("-1")
--- End diff --

Even if the API allows zero or less, we could just say zero means infinite 
and use a NONNEGATIVE_INTEGER_VALIDATOR


---


[GitHub] nifi pull request #3133: NIFI-5790: Exposes 6 commons-dbcp options in DBCPCo...

2018-11-07 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3133#discussion_r231580368
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
 ---
@@ -164,6 +161,71 @@ public ValidationResult validate(final String subject, 
final String input, final
 
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
 .build();
 
+public static final PropertyDescriptor MIN_IDLE = new 
PropertyDescriptor.Builder()
+.name("Minimum Idle Connections")
+.description("The minimum number of connections that can 
remain idle in the pool, without extra ones being " +
+"created, or zero to create none.")
+.defaultValue("0")
+.required(true)
+.addValidator(StandardValidators.INTEGER_VALIDATOR)
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor MAX_IDLE = new 
PropertyDescriptor.Builder()
+.name("Max Idle Connections")
+.description("The maximum number of connections that can 
remain idle in the pool, without extra ones being " +
+"released, or negative for no limit.")
+.defaultValue("8")
--- End diff --

IMO we should keep the default value as the one used when you don't 
explicitly set it, I guess that's 8?


---


[GitHub] nifi issue #2287: NIFI-4625 - Added External Version to the PutElastic5 Proc...

2018-11-06 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/2287
  
There has been some recent interest in this feature, do you think you'll be 
able to continue with this? Seems like a good feature to add.


---


[GitHub] nifi pull request #3128: NIFI-5788: Introduce batch size limit in PutDatabas...

2018-11-05 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3128#discussion_r230812123
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 ---
@@ -265,6 +265,17 @@
 
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
 .build();
 
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("put-db-record-batch-size")
+.displayName("Bulk Size")
+.description("Specifies batch size for INSERT and UPDATE 
statements. This parameter has no effect for other statements specified in 
'Statement Type'."
++ " Non-positive value has the effect of infinite bulk 
size.")
+.defaultValue("-1")
--- End diff --

What does a value of zero do? Would anyone ever use it? If not, perhaps 
zero is the best default to indicate infinite bulk size. If you do change it to 
zero, please change the validator to a NONNEGATIVE_INTEGER_VALIDATOR to match


---


[GitHub] nifi pull request #3128: NIFI-5788: Introduce batch size limit in PutDatabas...

2018-11-05 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3128#discussion_r230811717
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 ---
@@ -265,6 +265,17 @@
 
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
 .build();
 
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
--- End diff --

We should be consistent here with "batch size" and "bulk size" in the 
naming of variables, documentation, etc. Maybe "Maximum Batch Size"?


---


[GitHub] nifi issue #3117: NIFI-5770 Fix Memory Leak in ExecuteScript on Jython

2018-11-01 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3117
  
+1 LGTM, ran unit tests and tried a Jython script in a live NiFi instance. 
Thanks for the improvement! Merging to master


---


[GitHub] nifi issue #3123: NIFI-5781 - Incorrect schema for provenance events in Site...

2018-11-01 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3123
  
+1 LGTM, verified the documentation looks correct, and used the output S2S 
Prov Reporting Task in various record-based processors with the updated schema. 
Thanks for the improvement! Merging to master


---


[GitHub] nifi issue #3041: NIFI-5224 Added SolrClientService.

2018-10-29 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3041
  
Mind updating the versions to 1.9.0-SNAPSHOT? Please and thanks!


---


[GitHub] nifi issue #3105: NIFI-5621: Added Cassandra connection provider service

2018-10-29 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3105
  
Mind rebasing this and setting all 1.8.0-SNAPSHOT versions to 
1.9.0-SNAPSHOT? Please and thanks!


---


[GitHub] nifi issue #3107: NIFI-5744: Put exception message to attribute while Execut...

2018-10-25 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3107
  
Just to connect the dots here, there is some dissent from the PMC about 
this approach, so we should do our best to reach consensus on the mailing list 
before this PR is merged, in case the direction changes (switch from an 
attribute to additional outgoing relationships, e.g.)


---


[GitHub] nifi issue #3020: NIFI-5625: support the variables for the properties of HTT...

2018-10-25 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3020
  
Mind rebasing this against the latest master? Please and thanks!


---


[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-10-25 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228325643
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 ---
@@ -132,17 +132,25 @@
 .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
 .build();
 
+// Relationships
 public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
 .name("success")
-.description("A FlowFile is transferred to this relationship 
if the operation completed successfully.")
+.description("Successfully created FlowFile from CQL query 
result set.")
--- End diff --

This relationship is reused by PutCassandraQL as well, where there is no 
result set or query per se (it's a statement). That's why the doc is so 
generic. If you'd like to have the doc be more specific, you can create a 
REL_SUCCESS relationship in QueryCassandra using `new 
Relationship.Builder().from(AbstractCassandraProcessor.REL_SUCCESS).description("Your
 description override").build()`


---


[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-10-25 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228325897
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -33,7 +32,7 @@
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
-import org.apache.commons.text.StringEscapeUtils;
+import org.apache.commons.lang3.StringEscapeUtils;
--- End diff --

I think we're on the verge of using Apache Commons Text instead of Commons 
Lang 3, maybe consider keeping this the way it is?


---


[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-10-25 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228332165
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -400,77 +478,87 @@ public static long convertToJsonStream(final 
ResultSet rs, final OutputStream ou
 outStream.write("{\"results\":[".getBytes(charset));
 final ColumnDefinitions columnDefinitions = 
rs.getColumnDefinitions();
 long nrOfRows = 0;
+long rowsAvailableWithoutFetching = 
rs.getAvailableWithoutFetching();
+
 if (columnDefinitions != null) {
-do {
-
-// Grab the ones we have
-int rowsAvailableWithoutFetching = 
rs.getAvailableWithoutFetching();
-if (rowsAvailableWithoutFetching == 0) {
-// Get more
-if (timeout <= 0 || timeUnit == null) {
-rs.fetchMoreResults().get();
-} else {
-rs.fetchMoreResults().get(timeout, timeUnit);
-}
+
+// Grab the ones we have
+if (rowsAvailableWithoutFetching == 0) {
+// Get more
+if (timeout <= 0 || timeUnit == null) {
+rs.fetchMoreResults().get();
+} else {
+rs.fetchMoreResults().get(timeout, timeUnit);
 }
+rowsAvailableWithoutFetching = 
rs.getAvailableWithoutFetching();
+}
 
-for (Row row : rs) {
-if (nrOfRows != 0) {
+if(maxRowsPerFlowFile == 0){
+maxRowsPerFlowFile = rowsAvailableWithoutFetching;
+}
+Row row;
+while(nrOfRows < maxRowsPerFlowFile){
+try {
+row = rs.iterator().next();
+}catch (NoSuchElementException nsee){
+//nrOfRows -= 1;
--- End diff --

This is commented out here but active in the Avro version above, I assume 
they need to be the same?


---


[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-10-25 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228331847
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = queryFuture.getUninterruptibly();
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAv

[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-10-25 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228327507
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = queryFuture.getUninterruptibly();
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAv

[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-10-25 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r228328178
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext 
context) {
 
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+FlowFile inputFlowFile = null;
 FlowFile fileToProcess = null;
+
+Map attributes = null;
+
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+
+attributes = inputFlowFile.getAttributes();
 }
 
 final ComponentLog logger = getLogger();
-final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
+final String selectQuery = 
context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue();
+final long queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS);
 final String outputFormat = 
context.getProperty(OUTPUT_FORMAT).getValue();
-final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
+final long maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+final Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue());
 final StopWatch stopWatch = new StopWatch(true);
 
-if (fileToProcess == null) {
-fileToProcess = session.create();
+if(inputFlowFile != null){
+session.transfer(inputFlowFile, REL_ORIGINAL);
 }
 
 try {
 // The documentation for the driver recommends the session 
remain open the entire time the processor is running
 // and states that it is thread-safe. This is why 
connectionSession is not in a try-with-resources.
 final Session connectionSession = cassandraSession.get();
-final ResultSetFuture queryFuture = 
connectionSession.executeAsync(selectQuery);
+final ResultSet resultSet;
+
+if (queryTimeout > 0) {
+resultSet = connectionSession.execute(selectQuery, 
queryTimeout, TimeUnit.MILLISECONDS);
+}else{
+resultSet = connectionSession.execute(selectQuery);
+}
+
 final AtomicLong nrOfRows = new AtomicLong(0L);
 
-fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
-@Override
-public void process(final OutputStream out) throws 
IOException {
-try {
-logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
-final ResultSet resultSet;
-if (queryTimeout > 0) {
-resultSet = 
queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, 
TimeUnit.MILLISECONDS));
-} else if (JSON_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, 
TimeUnit.MILLISECONDS));
-}
-} else {
-resultSet = queryFuture.getUninterruptibly();
-if (AVRO_FORMAT.equals(outputFormat)) {
-
nrOfRows.set(convertToAv

[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...

2018-10-24 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3086
  
+1 LGTM, tested successfully. Thanks for the fix! Merging to master


---


[GitHub] nifi issue #3107: NIFI-5744: Put exception message to attribute while Execut...

2018-10-24 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3107
  
Reviewing...


---


[GitHub] nifi issue #3104: NIFI-5740 Ensuring permissions are restored after test com...

2018-10-23 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3104
  
+1 LGTM, verified I couldn't delete the directories manually without this 
change, and I can afterwards. Thanks for the fix! Merging to master


---


[GitHub] nifi issue #3094: NIFI-5727 - Optimize GenerateTableFetch to remove unnecess...

2018-10-23 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3094
  
+1 LGTM, ran regression tests and tried on a NiFi instance. Thanks for the 
improvement! Merging to master


---


[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...

2018-10-23 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3086
  
Not sure if you want to try another rebase to see if the repo problem is 
gone (and the realm problem is fixed), or just Ignore the test. I'm fine with 
either, let me know when you're happy with it (and have tested it even if 
there's no unit test enabled) and I'll finish the review/merge. Thanks!


---


[GitHub] nifi issue #3092: NIFI-5525 - CSVRecordReader fails with StringIndexOutOfBou...

2018-10-22 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3092
  
Pierre made the change Peter recommended (I verified). I think we need it 
in now as the current Jira is only partially complete for 1.8.0 without it.

Peter, I'm going to go ahead and merge this so we can cut the RC, please 
let me know if you disagree and we can discuss further; otherwise I suspect 
we're good to go, thanks all!


---


[GitHub] nifi issue #3101: NIFI-5736: Removed 'synchronized' keyword from SwappablePr...

2018-10-22 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3101
  
+1 LGTM, ran full build with tests and tried a live NiFi instance. Thanks 
for the fix! Merging to master


---


[GitHub] nifi pull request #3092: NIFI-5525 - CSVRecordReader fails with StringIndexO...

2018-10-22 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3092#discussion_r227045025
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
 ---
@@ -132,6 +131,10 @@ protected final Object convertSimpleIfPossible(final 
String value, final DataTyp
 return value;
 }
 
+private String trim(String value) {
+return value.startsWith("\"") && value.endsWith("\"") && 
(value.length() > 1) ? value.substring(1, value.length() - 1) : value;
--- End diff --

Doesn't seem like it would offer a significant performance improvement on 
average, only if most of the strings to be trimmed were single characters. I'm 
+1 on this one if you are, I can do the merge (to get this into 1.8.0 RC3) if 
you like.


---


[GitHub] nifi issue #3079: NIFI-5706 : Added ConvertAvroToParquet processor

2018-10-18 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3079
  
What is the use case for having Parquet files flow through NiFi? I wrote 
ConvertAvroToORC and every time I saw it used, it was immediately followed by 
PutHDFS. So I followed the PutParquet lead and wrote a PutORC for the Hive 3 
NAR. We can't currently do anything with a Parquet file (or ORC for that 
matter) in NiFi, so just curious as to how you envision it being used.

Also I wonder if a ParquetRecordWriter might be a better idea? It would do 
the same thing as a processor but the record processors can read in something 
in any format, not just Avro. This was another impetus for having PutORC 
instead of ConvertAvroToORC.


---


[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...

2018-10-18 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3086
  
Doesn't look like the tests were run due to a bad artifact. Maybe rebase 
the PR (squashing just the commits where you tried individual things?) and 
force push, hopefully it'll go through this time :( Sorry this is such a pain.


---


[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...

2018-10-18 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3086
  
The krb5.conf from the TestRangerNiFiAuthorizer looks like this:

```
[libdefaults]
 default_realm = EXAMPLE.COM
 dns_lookup_kdc = false
 dns_lookup_realm = false

[realms]
 EXAMPLE.COM = {
 kdc = kerberos.example.com
 admin_server = kerberos.example.com
 }
```

And doesn't have the setting of the `java.security.krb5.realm` or 
`java.security.krb5.kdc` (I assume because they are unnecessary based on the 
dns_lookup_* properties?). Might be worth a try...


---


[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...

2018-10-17 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3086
  
First one didn't work :(


---


[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...

2018-10-17 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3086
  
Same here, the relevant error in the Travis log is "Cannot locate default 
realm". Looking at other tests that load krb5.conf, one 
(TestHBase_1_1_2_ClientService) has this:

```
// needed for calls to UserGroupInformation.setConfiguration() to work when 
passing in
// config with Kerberos authentication enabled
System.setProperty("java.security.krb5.realm", "nifi.com");
System.setProperty("java.security.krb5.kdc", "nifi.kdc");
```

and another (TestRangerNiFiAuthorizer) has this:

```
// have to initialize this system property before anything else
File krb5conf = new File("src/test/resources/krb5.conf");
assertTrue(krb5conf.exists());
System.setProperty("java.security.krb5.conf", krb5conf.getAbsolutePath());
```

Perhaps one or both of these would fix the issue? Not sure how to 
reproduce, I guess you could try one and push the commit to see if Travis 
succeeds...


---


[GitHub] nifi issue #3086: NIFI-5714 - Hive[3]ConnectionPool - Kerberos Authenticatio...

2018-10-17 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3086
  
Looks like the unit test is throwing the wrong exception, 
IllegalArgumentException instead of InitializationException


---


[GitHub] nifi issue #3081: NIFI-5708 Fixing the creation of ValidationContextFactory ...

2018-10-16 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3081
  
+1 LGTM, thanks for the fix! Merging to master


---


[GitHub] nifi pull request #3076: NIFI-5705: Added Hive 3 attribution to nifi-assembl...

2018-10-15 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/3076

NIFI-5705: Added Hive 3 attribution to nifi-assembly and top-level NOTICEs

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [x] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-5705

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3076.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3076


commit e4c1fc0d8e6bbe6ca4b07f7556f248567721a268
Author: Matthew Burgess 
Date:   2018-10-15T23:34:05Z

NIFI-5705: Added Hive 3 attribution to nifi-assembly and top-level NOTICEs




---


[GitHub] nifi pull request #3075: NIFI-5604: Added property to allow empty FlowFile w...

2018-10-15 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/3075

NIFI-5604: Added property to allow empty FlowFile when no SQL generated by 
GenerateTableFetch

Note that this should work with NIFI-5601 (#3074) such that the fragment 
attributes would be written to the empty flow file as well. Depending on which 
gets merged first, the other probably needs to be updated.

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [x] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-5604

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3075.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3075


commit 43bfe9531a0aa2a64179d27b4196a6062f60ebbe
Author: Matthew Burgess 
Date:   2018-10-15T20:27:38Z

NIFI-5604: Added property to allow empty FlowFile when no SQL generated by 
GenerateTableFetch




---


[GitHub] nifi pull request #3074: NIFI-5601: Add fragment.* attributes to GenerateTab...

2018-10-15 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/3074

NIFI-5601: Add fragment.* attributes to GenerateTableFetch

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [x] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-5601

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3074.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3074


commit bd6ce6d8c74b65a222fb2a842465e5aae2d40272
Author: Matthew Burgess 
Date:   2018-10-15T20:07:13Z

NIFI-5601: Add fragment.* attributes to GenerateTableFetch




---


[GitHub] nifi issue #3062: NIFI-5686: Updated StandardProcessScheduler so that if it ...

2018-10-15 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3062
  
+1 LGTM, ran full build with unit tests, tried on a live instance with 
reporting tasks and various other flows. Thanks for the improvement! Merging to 
master


---


[GitHub] nifi pull request #2861: NIFI-5248 Added new Elasticsearch json and record p...

2018-10-15 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2861#discussion_r225216435
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.jayway.jsonpath.JsonPath;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticSearchError;
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.elasticsearch.put.FlowFileJsonDescription;
+import org.apache.nifi.processors.elasticsearch.put.JsonProcessingError;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put", 
"index"})
+@CapabilityDescription("This processor puts user-supplied JSON into 
ElasticSearch. It does not require a schema.")
+public class PutElasticsearchJson extends AbstractProcessor implements 
ElasticSearchRestProcessor {
+
+static final PropertyDescriptor ID_ATTRIBUTE = new 
PropertyDescriptor.Builder()
+.name("put-es-json-id-attribute")
+.displayName("ID Attribute")
+.description("The attribute to use for setting the document ID in 
ElasticSearch. If the payload is an array, " +
+"and this option is used for getting the ID, an exception 
will be raised.")
+.required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(Validator.VALID)
+.build();
+static final PropertyDescriptor ID_JSON_PATH = new 
PropertyDescriptor.Builder()
+.name("put-es-json-id-json-path")
+.displayName("ID JSONPath")
+.description("If set, the document ID will be pulled from each 
json block using this JSONPath operation.")
--- End diff --

Nitpick, "json" should be "JSON" everywhere in the doc for consistency


---


[GitHub] nifi pull request #2861: NIFI-5248 Added new Elasticsearch json and record p...

2018-10-15 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2861#discussion_r225215375
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/pom.xml
 ---
@@ -107,6 +146,38 @@ language governing permissions and limitations under 
the License. -->
 
 
 
+
+org.apache.maven.plugins
+maven-compiler-plugin
--- End diff --

I think we talked about this before but can't find it. Does it need to be 
here? I thought we can inherit the one from the top-level parent POM?


---


[GitHub] nifi pull request #2861: NIFI-5248 Added new Elasticsearch json and record p...

2018-10-15 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2861#discussion_r225215710
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticSearchRestProcessor.java
 ---
@@ -75,6 +76,19 @@
 .required(true)
 .build();
 
+Relationship REL_FAILURE = new Relationship.Builder()
+.name("failure")
+.description("All flowfiles that fail for reasons unrelated to 
server availability go to this relationship.")
+.build();
+Relationship REL_RETRY = new Relationship.Builder()
+.name("retry")
+.description("All flowfiles that fail due to server/cluster 
availability go to this relationship.")
+.build();
+Relationship REL_SUCCESS = new Relationship.Builder()
+.name("success")
+.description("All flowfiles that succeed in being indexed into 
ElasticSearch go here.")
--- End diff --

Maybe "transmitted to ElasticSearch"? Unless "indexed" is a generic enough 
term that users would know covers upserts, deletes, and any other supported 
operation.


---


[GitHub] nifi pull request #2861: NIFI-5248 Added new Elasticsearch json and record p...

2018-10-15 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2861#discussion_r225208197
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
 ---
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticSearchError;
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put", 
"index", "record"})
+@CapabilityDescription("A record-aware ElasticSearch put processor that 
uses the official Elastic REST client libraries.")
+public class PutElasticsearchRecord extends AbstractProcessor implements 
ElasticSearchRestProcessor {
+static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("put-es-record-reader")
+.displayName("Record Reader")
+.description("The record reader to use for reading incoming 
records from flowfiles.")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+static final PropertyDescriptor OPERATION_RECORD_PATH = new 
PropertyDescriptor.Builder()
--- End diff --

If this is specified, does the field remain in the document? What if the 
user didn't want to have that field included in the document, but needed it in 
order to specify the operation as a field?

What if the index operation were in an attribute versus a record field? 
Like if you partitioned records based on whether they were upsert vs delete 
operations (for example). Perhaps consider something like UpdateRecord's 
"Replacement Value Strategy" property, which allows you to choose whether the 
Operation field would be evaluated as a Record Path or a Literal Value. Both 
can support EL, but the latter would allow the setting of a single operation 
for all the records in the flow file.


---


[GitHub] nifi pull request #2861: NIFI-5248 Added new Elasticsearch json and record p...

2018-10-15 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2861#discussion_r225208562
  
--- Diff: 
nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
 ---
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.elasticsearch.ElasticSearchClientService;
+import org.apache.nifi.elasticsearch.ElasticSearchError;
+import org.apache.nifi.elasticsearch.IndexOperationRequest;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put", 
"index", "record"})
+@CapabilityDescription("A record-aware ElasticSearch put processor that 
uses the official Elastic REST client libraries.")
+public class PutElasticsearchRecord extends AbstractProcessor implements 
ElasticSearchRestProcessor {
+static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("put-es-record-reader")
+.displayName("Record Reader")
+.description("The record reader to use for reading incoming 
records from flowfiles.")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+static final PropertyDescriptor OPERATION_RECORD_PATH = new 
PropertyDescriptor.Builder()
+.name("put-es-record-operation-path")
+.displayName("Operation Record Path")
+.description("A record path expression to retrieve index operation 
setting from each record. If left blank, " +
+"all operations will be assumed to be index operations.")
+.addValidator(new RecordPathValidator())
+.required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
+static final PropertyDescriptor ID_RECORD_PATH = new 
PropertyDescriptor.Builder()
+.name("put-es-record-id-path")
+.displayName("ID Record Path")
+.description("A record path expression to retrieve the ID field 
for use with ElasticSearch. 

[GitHub] nifi issue #3057: NIFI-5667: Add nested record support for PutORC

2018-10-12 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3057
  
Updated the rest of the backticks, thanks @VikingK and @bbende for your 
reviews!


---


[GitHub] nifi issue #3057: NIFI-5667: Add nested record support for PutORC

2018-10-11 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3057
  
I found the issue and pushed a new commit with the fix. Also your test data 
exercised another part of the code I hadn't thought of, your "as" field is a 
keyword in Hive so when I used the generated hive.ddl attribute to create the 
table on top of the ORC files, it didn't work. The other change in this commit 
is to backtick-quote the field names to protect against field names that are 
reserved words.


---


[GitHub] nifi pull request #3060: NIFI-5678: Fixed MAP type support of MapRecord obje...

2018-10-10 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/3060

NIFI-5678: Fixed MAP type support of MapRecord objects in 
StandardSchemaValidator

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-5678

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3060.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3060


commit c66de368d07f37a8652c09e4b116b232972821ac
Author: Matthew Burgess 
Date:   2018-10-10T19:01:40Z

NIFI-5678: Fixed MAP type support of MapRecord objects in 
StandardSchemaValidator




---


[GitHub] nifi pull request #3057: NIFI-5667: Add nested record support for PutORC

2018-10-10 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3057#discussion_r224126176
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java
 ---
@@ -157,19 +155,17 @@ public String getDefaultCompressionType(final 
ProcessorInitializationContext con
 public HDFSRecordWriter createHDFSRecordWriter(final ProcessContext 
context, final FlowFile flowFile, final Configuration conf, final Path path, 
final RecordSchema schema)
 throws IOException, SchemaNotFoundException {
 
-final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
-
 final long stripeSize = 
context.getProperty(STRIPE_SIZE).asDataSize(DataUnit.B).longValue();
 final int bufferSize = 
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
 final CompressionKind compressionType = 
CompressionKind.valueOf(context.getProperty(COMPRESSION_TYPE).getValue());
 final boolean normalizeForHive = 
context.getProperty(HIVE_FIELD_NAMES).asBoolean();
-TypeInfo orcSchema = NiFiOrcUtils.getOrcField(avroSchema, 
normalizeForHive);
+TypeInfo orcSchema = NiFiOrcUtils.getOrcSchema(schema, 
normalizeForHive);
 final Writer orcWriter = NiFiOrcUtils.createWriter(path, conf, 
orcSchema, stripeSize, compressionType, bufferSize);
 final String hiveTableName = 
context.getProperty(HIVE_TABLE_NAME).isSet()
 ? 
context.getProperty(HIVE_TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue()
-: 
NiFiOrcUtils.normalizeHiveTableName(avroSchema.getFullName());
+: 
NiFiOrcUtils.normalizeHiveTableName(schema.toString());// TODO
--- End diff --

Yep that's not the right thing to put there :) Will investigate getting a 
name from the record somehow, or defaulting to a hardcoded table name if none 
is provided.


---


[GitHub] nifi issue #3057: NIFI-5667: Add nested record support for PutORC

2018-10-10 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3057
  
Can you share the schema of the data, and possibly a JSON export of your 
Avro file? I couldn't reproduce this with an array of ints, and @bbende ran 
successfully with an array of records.


---


[GitHub] nifi pull request #3057: NIFI-5667: Add nested record support for PutORC

2018-10-09 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3057#discussion_r223893228
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/orc/PutORC.java
 ---
@@ -157,19 +155,17 @@ public String getDefaultCompressionType(final 
ProcessorInitializationContext con
 public HDFSRecordWriter createHDFSRecordWriter(final ProcessContext 
context, final FlowFile flowFile, final Configuration conf, final Path path, 
final RecordSchema schema)
 throws IOException, SchemaNotFoundException {
 
-final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
-
 final long stripeSize = 
context.getProperty(STRIPE_SIZE).asDataSize(DataUnit.B).longValue();
 final int bufferSize = 
context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
 final CompressionKind compressionType = 
CompressionKind.valueOf(context.getProperty(COMPRESSION_TYPE).getValue());
 final boolean normalizeForHive = 
context.getProperty(HIVE_FIELD_NAMES).asBoolean();
-TypeInfo orcSchema = NiFiOrcUtils.getOrcField(avroSchema, 
normalizeForHive);
+TypeInfo orcSchema = NiFiOrcUtils.getOrcSchema(schema, 
normalizeForHive);
 final Writer orcWriter = NiFiOrcUtils.createWriter(path, conf, 
orcSchema, stripeSize, compressionType, bufferSize);
 final String hiveTableName = 
context.getProperty(HIVE_TABLE_NAME).isSet()
 ? 
context.getProperty(HIVE_TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue()
-: 
NiFiOrcUtils.normalizeHiveTableName(avroSchema.getFullName());
+: 
NiFiOrcUtils.normalizeHiveTableName(schema.toString());// TODO
--- End diff --

I admit I hadn't tested this part, the TODO should be removed but we likely 
need a way to get at the "name" of the top-level record if the Hive Table Name 
property is not set. Then again, I haven't seen anyone rely on the schema's 
full name as the table name, the Hive Table Name property is the recommended 
way to set this for the generated DDL. Welcome all comments though :)


---


[GitHub] nifi pull request #3057: NIFI-5667: Add nested record support for PutORC

2018-10-09 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3057#discussion_r223892664
  
--- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/hadoop/hive/ql/io/orc/NiFiOrcUtils.java
 ---
@@ -163,19 +161,30 @@ public static Object convertToORCObject(TypeInfo 
typeInfo, Object o, final boole
 .mapToObj((element) -> 
convertToORCObject(TypeInfoFactory.getPrimitiveTypeInfo("boolean"), element == 
1, hiveFieldNames))
 .collect(Collectors.toList());
 }
-if (o instanceof GenericData.Array) {
-GenericData.Array array = ((GenericData.Array) o);
-// The type information in this case is interpreted as a 
List
-TypeInfo listTypeInfo = ((ListTypeInfo) 
typeInfo).getListElementTypeInfo();
-return array.stream().map((element) -> 
convertToORCObject(listTypeInfo, element, 
hiveFieldNames)).collect(Collectors.toList());
-}
 if (o instanceof List) {
 return o;
 }
+if (o instanceof Record) {
--- End diff --

This is actually the part that fixes the nested records issue. The rest is 
that from the Record, we can only get RecordSchema info, where the original 
util methods required Avro schema/type info. The other changes are a 
consequence of this, to replace Avro-specific stuff with NiFi Record API stuff.


---


[GitHub] nifi pull request #3057: NIFI-5667: Add nested record support for PutORC

2018-10-09 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3057#discussion_r223892405
  
--- Diff: 
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java
 ---
@@ -58,6 +58,10 @@ public void addSchemaField(final String fieldName, final 
RecordFieldType type, b
 fields.add(new RecordField(fieldName, type.getDataType(), 
isNullable));
 }
 
+public void addSchemaField(final RecordField recordField) {
--- End diff --

This preserves the full data type of the field. 
RecordFieldType.getDataType(), which is called from the other add() methods, 
returns the "base" type, such as "ARRAY" instead of "ARRAY[INT]", for equality 
purposes.


---


[GitHub] nifi pull request #3057: NIFI-5667: Add nested record support for PutORC

2018-10-09 Thread mattyb149
GitHub user mattyb149 opened a pull request:

https://github.com/apache/nifi/pull/3057

NIFI-5667: Add nested record support for PutORC

The basic approach here is that I removed all references to Avro 
schemas/fields and replaced them with NiFi Record API concepts. This allows us 
to not have to switch back and forth, since Avro is not the de facto standard 
for schemas or flow file content (although we are still fairly tightly coupled 
to Avro schemas, but that's a different issue :) )

I'll comment in the PR on various parts of the code to note the "real" 
changes to fix the reported issue.

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mattyb149/nifi NIFI-5667

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3057.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3057


commit ab16d6f77c7dead859643492fe650685ffd7e4ba
Author: Matthew Burgess 
Date:   2018-10-09T22:59:00Z

NIFI-5667: Add nested record support for PutORC




---


[GitHub] nifi issue #2966: NIFI-5552 - Add option to normalize header column names in...

2018-10-09 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/2966
  
I'm a little leery of automatically normalizing the names. In 
[NIFI-4612](https://issues.apache.org/jira/browse/NIFI-4612) we added the 
ability to disable name validation for the AvroSchemaRegistry, so there could 
be Avro-illegal field names but they'd be preserved. I think that came up for 
things like PutDatabaseRecord where the field names needed to be preserved but 
were not Avro-valid. If we automatically normalize them, there might be 
unintended (and undesired) consequence to things like PutParquet and 
PutDatabaseRecord. 

Perhaps instead of (or in addition to) normalization, we provide the option 
to "disable schema validation". This isn't technically Avro-specific, but would 
be used while we are so coupled to Avro schemas in the meantime?


---


[GitHub] nifi issue #2997: NIFI-5583: Add cdc processor for MySQL referring to GTID.

2018-10-09 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/2997
  
Do we need a separate processor for this, or can we just add GTID support 
to the existing one (perhaps with a property such as "Include GTID" or 
whatever, defaulting to false but if set to true, executes the GTID-related 
code)?


---


[GitHub] nifi issue #3025: NIFI-5605 Added UpdateCassandra.

2018-10-09 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3025
  
Can PutCassandraRecord also be improved to allow inserts, updates, deletes? 
I'm thinking PutCassandraRecord could be like PutDatabaseRecord. Doesn't 
PutCassandraQL already support updates since the user provides the CQL to 
execute?


---


[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-10-09 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r223701761
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -192,15 +205,17 @@ public void onScheduled(final ProcessContext context) 
{
 @Override
 public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
 FlowFile fileToProcess = null;
+FlowFile inputFlowFile = null;
 if (context.hasIncomingConnection()) {
-fileToProcess = session.get();
+inputFlowFile = session.get();
 
 // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
 // However, if we have no FlowFile and we have connections 
coming from other Processors, then
 // we know that we should run only if we have a FlowFile.
-if (fileToProcess == null && context.hasNonLoopConnection()) {
+if (inputFlowFile == null && context.hasNonLoopConnection()) {
 return;
 }
+session.remove(inputFlowFile);
--- End diff --

I don't follow the logic here. If there is a flow file in the incoming 
connection,  this appears to remove it from the session, and fileToProcess will 
always be null, which means we couldn't use flow file attributes to evaluate 
properties such as CQL Query, Query Timeout, Charset, etc. Another effect is 
that provenance/lineage will not be preserved for incoming files, as the 
incoming file will be removed, and any flow files generated by this processor 
will appear to have been created here, so you can't track that a flow file "A" 
came in and, as a result, generated flow files X,Y,Z.


---


[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...

2018-10-09 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3051#discussion_r223700156
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 ---
@@ -132,6 +131,20 @@
 
 private final static List propertyDescriptors;
 
+// Relationships
+public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
--- End diff --

These relationships are available (albeit with more generic descriptions) 
in the AbstractCassandraProcessor class. If we need more specific documentation 
then we should remove the common relationships and provide specific 
documentation/relationships in all Cassandra processors.


---


[GitHub] nifi issue #2974: NIFI-5533: Be more efficient with heap utilization

2018-10-09 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/2974
  
+1 LGTM, ran full build with unit tests and tried many different scenarios 
with record processors, attributes, etc. Thanks for the improvements! Merging 
to master


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-10-04 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r222757166
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+.name("neo4j-password")
+.displayName("Password")
+.description("Password for Neo4J user")
+.required(true)
+.sensitive(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new 
AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round 
Robin Strategy");
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = 
new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n

[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-10-04 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r222753864
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/Neo4JCypherExecutor.java
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.neo4j.driver.v1.Session;
+import org.neo4j.driver.v1.StatementResult;
+import org.neo4j.driver.v1.summary.ResultSummary;
+import org.neo4j.driver.v1.summary.SummaryCounters;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
--- End diff --

Since this processor can delete data from the target, I'm not sure 
SupportsBatching is appropriate here. Then again, I don't know enough about the 
nuances, perhaps @markap14 can chime in.


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-10-04 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r222758854
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
--- End diff --

For user-friendliness, couldn't we remove the onus from the user and just 
pick a dummy non-blank password if none was supplied in the property? Then it 
wouldn't need to be a required property.


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-10-04 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r222758485
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
--- End diff --

The Query is in Cypher, my preference is for this property to refer to 
Cypher rather than Neo4J. See my other comments about making this processor 
more generic to support more openCypher platforms.

Also, would Cypher's syntax cause issues when evaluating Expression 
Language? It doesn't look like you're using parameters so I wouldn't expect 
something like an "UNWIND $myList" or anything, but just in case it might be a 
good idea to mention any caveats, limitations, etc. in the description of this 
property.


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-10-04 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r222756058
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+.name("neo4j-password")
+.displayName("Password")
+.description("Password for Neo4J user")
+.required(true)
+.sensitive(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new 
AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round 
Robin Strategy");
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = 
new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.name(), "Least 
Connected", "Least Connect

[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-10-04 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r222756361
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+.name("neo4j-password")
+.displayName("Password")
+.description("Password for Neo4J user. A dummy non-blank 
password is required even if it disabled on the server.")
+.required(true)
+.sensitive(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new 
AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round 
Robin Strategy");
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = 
new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n

[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-10-04 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r222760950
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
--- End diff --

What if the Cypher query was the content of the incoming flow file? 
ExecuteSQL makes this property not required, and if left empty, it assumes that 
the query is in the flow file content and uses that. I've seen some pretty 
large Cypher queries, in this case I'd need an ExtractText to get the whole 
thing into an attribute just so I can pass it back to this property via 
Expression Language.

It's not a total deal-breaker for me, just wanted to suggest/recommend it 
for user-friendliness. I know for ExecuteSQL that property is very often left 
blank, as something upstream generates the SQL as the body of the flow file.


---


[GitHub] nifi issue #2945: NIFI-4517: Added ExecuteSQLRecord and QueryDatabaseTableRe...

2018-10-04 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/2945
  
@pvillard31 @MikeThomsen Think we can get the final review done in time for 
1.8.0? Folks have been asking for these processors for a long time :)


---


[GitHub] nifi issue #3025: NIFI-5605 Added UpdateCassandra.

2018-10-01 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3025
  
Is there a reason we can't have delete/update capabilities in 
PutCassandraQL and/or PutCassandraRecord? Wondering why the need for a separate 
processor. If it does need to be separate, maybe UpdateCassandraRecord instead 
or in addition to?


---


[GitHub] nifi issue #3015: NIFI-5254 [WIP] Updated to Groovy 2.5.2

2018-10-01 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3015
  
Unfortunately I believe you need to enumerate all the transitive 
dependencies in the L&N. You should have been able to copy that from the Groovy 
NOTICE, but it does not have its own dependencies listed in there (assuming 
some of the dependencies have NOTICEs), so we have to do it here. Their LICENSE 
mentions a couple of dependencies, but I'm not sure it's complete, so you'll 
have to verify both our L&N include the appropriate entries.


---


[GitHub] nifi issue #2861: NIFI-5248 Added new Elasticsearch json and record processo...

2018-10-01 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/2861
  
Mind rebasing this one? I think you were waiting on the ES LookupService PR 
anyway, and now it's in :)


---


[GitHub] nifi issue #3042: NIFI-5650: Added Xerces to scripting bundle for Jython 2.7...

2018-10-01 Thread mattyb149
Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3042
  
Thanks! Might need to update your master before pushing, I brought in your 
ES Lookup Service PR (finally :P)


---


  1   2   3   4   5   6   7   8   9   10   >