[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-10-05 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r223046927
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
--- End diff --

My thought was that the query results will be the flow file content, while 
preserving the query as an attribute.  This way down stream processors can have 
access to both the results and the query for additional processing.


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-10-05 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r223045178
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+.name("neo4j-password")
+.displayName("Password")
+.description("Password for Neo4J user")
+.required(true)
+.sensitive(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new 
AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round 
Robin Strategy");
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = 
new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n

[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-10-05 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r223045802
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+.name("neo4j-password")
+.displayName("Password")
+.description("Password for Neo4J user. A dummy non-blank 
password is required even if it disabled on the server.")
+.required(true)
+.sensitive(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new 
AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round 
Robin Strategy");
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = 
new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n

[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-10-05 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r223044277
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
--- End diff --

@mattyb149 - Neo4j defaults to and prefers enforcing authentication. But 
let me know if you think we should keep it optional.


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-10-05 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r223043551
  
--- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/pom.xml 
---
@@ -0,0 +1,87 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-neo4j-bundle
+1.8.0-SNAPSHOT
+
+
+nifi-neo4j-processors
+jar
+
+
+ 
+ org.neo4j.driver
+ neo4j-java-driver
+ 1.6.2
+ compile
+
+
+org.apache.nifi
+nifi-api
+
+
+org.apache.nifi
+nifi-utils
+1.8.0-SNAPSHOT
+
+
+commons-io
+commons-io
+2.6
+
+
+org.apache.commons
+commons-lang3
+3.7
+
+
+com.google.code.gson
+gson
+2.7
+
+
+org.apache.nifi
+nifi-mock
+1.8.0-SNAPSHOT
+test
+
+
+com.google.guava
+guava
+18.0
--- End diff --

Will work on it.


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-10-02 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r221975264
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+.name("neo4j-password")
+.displayName("Password")
+.description("Password for Neo4J user")
+.required(true)
+.sensitive(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new 
AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round 
Robin Strategy");
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = 
new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n

[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-30 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r221470898
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+.name("neo4j-password")
+.displayName("Password")
+.description("Password for Neo4J user")
+.required(true)
+.sensitive(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new 
AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round 
Robin Strategy");
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = 
new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n

[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-27 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2956
  
@MikeThomsen - Regarding your graph output questions - Can you please let 
me know what are the commands/files that were ingested for above output ?  


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-27 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r221104284
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/test/java/org/apache/nifi/processors/neo4j/TestNeo4JCyperExecutor.java
 ---
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.StatementResult;
+import org.neo4j.driver.v1.Record;
+import org.neo4j.driver.v1.summary.ResultSummary;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import java.io.File;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Neo4J Cypher unit tests.
+ */
+public class TestNeo4JCyperExecutor {
--- End diff --

Correcting.


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-27 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r221104258
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+.name("neo4j-password")
+.displayName("Password")
+.description("Password for Neo4J user")
+.required(true)
+.sensitive(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new 
AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round 
Robin Strategy");
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = 
new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n

[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-27 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r221103863
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+.name("neo4j-password")
+.displayName("Password")
+.description("Password for Neo4J user")
+.required(true)
+.sensitive(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new 
AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round 
Robin Strategy");
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = 
new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n

[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-27 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r221103455
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+.name("neo4j-password")
+.displayName("Password")
+.description("Password for Neo4J user")
+.required(true)
+.sensitive(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new 
AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round 
Robin Strategy");
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = 
new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n

[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-27 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r221101649
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+.name("neo4j-password")
+.displayName("Password")
+.description("Password for Neo4J user")
+.required(true)
+.sensitive(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new 
AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round 
Robin Strategy");
+
+public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = 
new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n

[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-27 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r221100711
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+.name("neo4j-password")
+.displayName("Password")
+.description("Password for Neo4J user")
+.required(true)
+.sensitive(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
--- End diff --

@alopresto - I can remove this.


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-27 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r221100628
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
+.name("neo4j-connection-url")
+.displayName("Neo4j Connection URL")
+.description("Neo4J endpoing to connect to.")
+.required(true)
+.defaultValue("bolt://localhost:7687")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+.name("neo4j-username")
+.displayName("Username")
+.description("Username for accessing Neo4J")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
--- End diff --

@MikeThomsen - Should I add a link to the documentation ?


---


[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-22 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2956
  
Rebased against master.


---


[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

2018-09-15 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2686
  
@ijokarumawak - Thanks for your comments.  I will checkout the pointers you 
have given.  Mans


---


[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-15 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2956
  
@MikeThomsen - There is a docker compose file in the nifi-neo4j-examples 
project mentioned above.  Note that to avoid any conflict with standalone 
server - the ports exposed in docker compose are different from the default 
neo4j server.


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-15 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r217887181
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/Neo4JCypherExecutor.java
 ---
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.neo4j.driver.v1.Session;
+import org.neo4j.driver.v1.StatementResult;
+import org.neo4j.driver.v1.summary.ResultSummary;
+import org.neo4j.driver.v1.summary.SummaryCounters;
+
+import com.google.gson.Gson;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"neo4j", "graph", "network", "insert", "update", "delete", "put", 
"get", "node", "relationship", "connection", "executor"})
+@CapabilityDescription("This processor executes a Neo4J Query 
(https://www.neo4j.com/) defined in the 'Neo4j Query' property of the "
++ "FlowFile and writes the result to the FlowFile body in JSON format. 
The processor has been tested with Neo4j version 3.4.5")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.ERROR_MESSAGE, description = "Neo4J error message"),
+@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.LABELS_ADDED, 
description = "Number of labels added"),
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.NODES_CREATED, description = "Number of nodes 
created"),
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.NODES_DELETED, description = "Number of nodes 
deleted"),
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.PROPERTIES_SET, description = "Number of properties 
set"),
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.RELATIONS_CREATED, description = "Number of 
relationships created"),
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.RELATIONS_DELETED, description = "Number of 
relationships deleted"),
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.ROWS_RETURNED, description = "Number of rows 
returned"),
+})
+public class Neo4JCypherExecutor extends AbstractNeo4JCypherExecutor {
+
+private static final Set relationships;
+private static final List propertyDescriptors;
+
+static {
+final Set t

[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-15 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r217887154
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Config;
+import org.neo4j.driver.v1.Config.ConfigBuilder;
+import org.neo4j.driver.v1.Config.LoadBalancingStrategy;
+import org.neo4j.driver.v1.Config.TrustStrategy;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+
+/**
+ * Abstract base class for Neo4JCypherExecutor processors
+ */
+abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor {
+
+protected static final PropertyDescriptor QUERY = new 
PropertyDescriptor.Builder()
+.name("neo4J-query")
+.displayName("Neo4J Query")
+.description("Specifies the Neo4j Query.")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONNECTION_URL = new 
PropertyDescriptor.Builder()
--- End diff --

@MikeThomsen - Just wanted to get some clarifications - Currently, the 
abstract base class has the code for getting the client.  Should I move that to 
a controller class ?  If you have any additional pointers, please let know.  


---


[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

2018-09-10 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2686
  
@markap14 @ijokarumawak @jzonthemtn and Nifi Team: Please let me know if 
you have any additional comments for this processor.  Thanks.


---


[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-06 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2956
  
@ottobackwards Thanks for your comments.  
@joewitt - Please let me know if you have any advice/pointers for me.


---


[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-05 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2956
  
@ottobackwards - I've update the code/documentation based on your feedback. 
 Please let me know if you have any additional advice for me.
Mans


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-05 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r215476392
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/test/java/org/apache/nifi/processors/neo4j/ITNeo4JCyperExecutor.java
 ---
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.neo4j.driver.v1.AuthTokens;
+import org.neo4j.driver.v1.Driver;
+import org.neo4j.driver.v1.GraphDatabase;
+import org.neo4j.driver.v1.Session;
+import org.neo4j.driver.v1.StatementResult;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
--- End diff --

I've added the steps to the javadoc for the integration tests.


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-05 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2956#discussion_r215476339
  
--- Diff: 
nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/Neo4JCypherExecutor.java
 ---
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.neo4j;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.neo4j.driver.v1.Session;
+import org.neo4j.driver.v1.StatementResult;
+import org.neo4j.driver.v1.summary.ResultSummary;
+import org.neo4j.driver.v1.summary.SummaryCounters;
+
+import com.google.gson.Gson;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"neo4j", "graph", "network", "insert", "update", "delete", "put", 
"get", "node", "relationship", "connection", "executor"})
+@CapabilityDescription("This processor executes a Neo4J Query 
(https://www.neo4j.com/) defined in the 'Neo4j Query' property of the "
++ "FlowFile and writes the result to the FlowFile body in JSON 
format.")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.ERROR_MESSAGE, description = "Neo4J error message"),
+@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.LABELS_ADDED, 
description = "Number of labels added"),
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.NODES_CREATED, description = "Number of nodes 
created"),
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.NODES_DELETED, description = "Number of nodes 
deleted"),
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.PROPERTIES_SET, description = "Number of properties 
set"),
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.RELATIONS_CREATED, description = "Number of 
relationships created"),
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.RELATIONS_DELETED, description = "Number of 
relationships deleted"),
+@WritesAttribute(attribute = 
AbstractNeo4JCypherExecutor.ROWS_RETURNED, description = "Number of rows 
returned"),
+})
+public class Neo4JCypherExecutor extends AbstractNeo4JCypherExecutor {
+
+private static final Set relationships;
+private static final List propertyDescriptors;
+
+static {
+final Set tempRelationships = new HashSet&l

[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-09-04 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2956
  
Hello @ottobackwards 

I've documented the steps for setting up neo4j and testing this processor 
in a project 
[nifi-neo4j-examples](https://github.com/mans2singh/nifi-flow-examples/tree/nifi-neo4j-examples)
 (branch nifi-neo4j-examples).  The project's 
[README.md](https://github.com/mans2singh/nifi-flow-examples/blob/nifi-neo4j-examples/README.md)
 has the steps, along with input sample files (creating a node, select all the 
nodes, delete nodes, etc) and a NIFI Neo4J sample template.

The nifi-neo4j-processors project also has full suite of integration tests 
which can be executed after setting up neo4j.

Please let me know if this helps, If you have andy feedback or advice for 
me, please let me know.

Thanks again.

Mans


---


[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-08-26 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2956
  
Hi @ottobackwards @joewitt 

I've renamed the bundle based on your feedback.  Please let me know if you 
have any more comments/feedback.

Thanks for your advice.


---


[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-08-20 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2956
  
@ottobackwards - I am open to changing the name to your recommendation.  
Mans


---


[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor

2018-08-19 Thread mans2singh
GitHub user mans2singh opened a pull request:

https://github.com/apache/nifi/pull/2956

NIFI-5537 Create Neo4J cypher execution processor

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [x] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [x] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [x] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ x] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mans2singh/nifi NIFI-5537

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2956.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2956


commit 1e35578166fd0fbd588a4fe06571d1f4d9efc3b1
Author: mans2singh 
Date:   2018-08-20T02:18:13Z

NIFI-5537 Create Neo4J cypher execution processor




---


[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

2018-07-12 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2686
  
Hi @ijokarumawak - 

I've merged your changes.  Please let me know if you have any more 
recommendations.

Thanks for your help.

Mans 


---


[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-07-06 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r200638844
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml
 ---
@@ -0,0 +1,114 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-deeplearning4j-bundle
+1.7.0-SNAPSHOT
+
+
+nifi-deeplearning4j-processors
+jar
+
+
+
+org.nd4j
+nd4j-api
+1.0.0-alpha
+
+
+org.nd4j
+nd4j-native-platform
+1.0.0-alpha
+
+ 
+ org.nd4j
+ nd4j-cuda-8.0-platform
+1.0.0-alpha
+ 
+ 
+ org.nd4j
+ nd4j-cuda-9.0-platform
+1.0.0-alpha
+ 
+ 
+ org.nd4j
+ nd4j-cuda-9.1-platform
+1.0.0-alpha
+ 
--- End diff --

@ijokarumawak

I tried you changes locally and they look great.  Is it ok with you if I 
merge them into my branch ?

Once again, thanks for your help.


---


[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-07-03 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r199823948
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml
 ---
@@ -0,0 +1,114 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-deeplearning4j-bundle
+1.7.0-SNAPSHOT
+
+
+nifi-deeplearning4j-processors
+jar
+
+
+
+org.nd4j
+nd4j-api
+1.0.0-alpha
+
+
+org.nd4j
+nd4j-native-platform
+1.0.0-alpha
+
+ 
+ org.nd4j
+ nd4j-cuda-8.0-platform
+1.0.0-alpha
+ 
+ 
+ org.nd4j
+ nd4j-cuda-9.0-platform
+1.0.0-alpha
+ 
+ 
+ org.nd4j
+ nd4j-cuda-9.1-platform
+1.0.0-alpha
+ 
--- End diff --

@ijokarumawak - Thanks for your updates, I will try them out locally.


---


[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-07-03 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r199823718
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml
 ---
@@ -0,0 +1,114 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-deeplearning4j-bundle
+1.7.0-SNAPSHOT
+
+
+nifi-deeplearning4j-processors
+jar
+
+
+
+org.nd4j
+nd4j-api
+1.0.0-alpha
+
+
+org.nd4j
+nd4j-native-platform
+1.0.0-alpha
+
+ 
+ org.nd4j
+ nd4j-cuda-8.0-platform
+1.0.0-alpha
+ 
+ 
+ org.nd4j
+ nd4j-cuda-9.0-platform
+1.0.0-alpha
+ 
+ 
+ org.nd4j
+ nd4j-cuda-9.1-platform
+1.0.0-alpha
+ 
--- End diff --

@ijokarumawak - That sounds great, I will remove the remove cuda 
dependencies.


---


[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

2018-06-20 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2686
  
@ijokarumawak - Just following-up:

1.  Regarding implementing record look up service - I believe that the 
processor and a record service lookup can be separate components useful for 
different use cases. As the file/rdbms based flows I've mentioned above, show - 
the processor can be used as a transformer.
2. Regarding providing more tools to prepare data - You are right, we can 
do that once we have the basics in place and there is a need for it. 
3. You had mentioned the concern of writing the results (predictions) in 
the body of the flow file - if you/community think we should keep the 
observations in the body and put the output in an attribute, I'd be happy to 
change that.

Thanks again for your thoughts and let me know if you have any more 
advice/recommendations.

Mans


---


[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-06-20 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r196994774
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JMultiLayerPredictor.java
 ---
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.deeplearning4j.util.ModelSerializer;
+import org.nd4j.linalg.api.ndarray.INDArray;
+import org.nd4j.linalg.factory.Nd4j;
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"deeplearning4j", "dl4j", "multilayer", "predict", 
"classification", "regression", "deep", "learning", "neural", "network"})
+@CapabilityDescription("The DeepLearning4JMultiLayerPredictor predicts one 
or more value(s) based on provided deeplearning4j 
(https://github.com/deeplearning4j) model and the content of a FlowFile. "
++ "The processor supports both classification and regression by 
extracting the record from the FlowFile body and applying the model. "
++ "The processor supports batch by allowing multiple records to be 
passed in the FlowFile body with each record separated by the 'Record 
Separator' property. "
++ "Each record can contain multiple fields with each field separated 
by the 'Field Separator' property."
+)
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = 
"Deeplearning4J error message"),
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = 
"Deeplearning4J output shape"),
+})
+public class DeepLearning4JMultiLayerPredictor extends 
AbstractDeepLearning4JProcessor {
+
+static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+.description("Successful DeepLearning4j results are routed to 
this relationship").build();
+
+static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("Failed DeepLearning4j results are routed to this 
relationship").build();
+
+protected final Gson gson = new Gson();
+
+protected MultiLayerNetwork model = null;
+
+@OnStopped
+public v

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

2018-06-17 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2686
  
@ijokarumawak - 

I've created two flow templates for testing the DL4J processor.  

The first with a file input and file output. The second flow reads a row 
from rdbms based on id input file containing the single id, classifies the 
observation and save the classification results in same row.  

The supporting files along with a sample classification model that will 
work with the two templates are in the repository 
[nifi-flow-examples](https://github.com/mans2singh/nifi-flow-examples.git) 
branch nifi-dl4j-flow.  

Here are some details of the two flow templates:

1. The first template is simple one 
[NifiDL4JFileInputOutput.xml](https://github.com/mans2singh/nifi-flow-examples/blob/nifi-dl4j-flow/dl4jtemplate/NifiDL4JFileInputOutput.xml)
 that ingests file containing an observation record from directory (sample 
input in dl4jinput), applies the classification model and writes results to the 
output directory with the same file name as input.  In this scenario, the 
correlation is based on the file names.

2. The second 
[NifiDL4JFileToRdbms.xml](https://github.com/mans2singh/nifi-flow-examples/blob/nifi-dl4j-flow/dl4jtemplate/NifiDL4JFileToRdbms.xml)
 reads a single id from input file (sample in dl4jinputid directory), queries a 
rdbms table for the observations for the input id, classifies the observation 
and updates the db row with classification results.  In this flow template, the 
row id of the input is used as a correlation id which is used to update the 
output column of the corresponding row after the classification is done. The 
flow uses other Nifi processors to ingest, transform, save the classification 
results.  The table creation and observation row insertion commands are in 
dl4jsql directory. 

The flow templates require setting the appropriate input/output files, dl4j 
model, db controller and rdbms table with the records.

Please let me know your thoughts/feedback.

Thanks

Mans


---


[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

2018-06-12 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2686
  
@joewitt - I will remove the nar from the assembly.  Let me know if there 
is any additional feedback.  Thanks.


---


[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

2018-06-12 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2686
  
@ijokarumawak - I was thinking that the processor will be used as a 
transformer (predictor) and there would be a correlation attribute in the flow 
file which would be used to associate the results with the observations.  This 
will keep the focus on transformation with simple outputs while still allowing 
the user the flexibility to use the correlation id to combine/enrich it with 
other data using a enrichment processor.   I've added test cases which show how 
to use correlation id.  What's your thought ?


---


[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-06-12 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r194944047
  
--- Diff: nifi-assembly/pom.xml ---
@@ -379,6 +379,12 @@ language governing permissions and limitations under 
the License. -->
 1.7.0-SNAPSHOT
 nar
 
+
+org.apache.nifi
+nifi-deeplearning4j-nar
+1.7.0-SNAPSHOT
+nar
+
--- End diff --

I would be happy to create a profile if that is the decision.


---


[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-06-12 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r194943820
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification_test.txt
 ---
@@ -0,0 +1,100 @@
+1.1,0.5,0.5,0.2,0
--- End diff --

@ijokarumawak - The tests included are to show how we can generate a model 
and configure the processor for regression and classification in the 
integration tests.  My thought while creating the mock data was create a the 
model with reproducible results even with limited observations and few 
iterations.  In real life, a multilayer model will be created, tested and 
validated by the user prior to plugging it into the component. 


---


[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

2018-06-10 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2686
  
@ijokarumawak - 
Thanks for your feedback.  I was away for a few days and will respond to 
your comments soon.
Mans


---


[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

2018-05-21 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2686
  
@markap14, NIFI team

Just wondering if you have any feedback on this processor.  

Thanks


---


[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-05-11 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r187663098
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.nd4j.linalg.api.ndarray.INDArray;
+import org.nd4j.linalg.factory.Nd4j;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"deeplearning4j", "dl4j", "predict", "classification", 
"regression", "deep", "learning"})
+@CapabilityDescription("The DeepLearning4JPredictor predicts one or more 
value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) 
model and the content of a FlowFile. "
++ "The processor supports both classification and regression by 
extracting the record from the FlowFile body and applying the model. "
++ "The processor supports batch by allowing multiple records to be 
passed in the FlowFile body with each record separated by the 'Record 
Separator' property. "
++ "Each record can contain multiple fields with each field separated 
by the 'Field Separator' property."
+)
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = 
"Deeplearning4J error message"),
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = 
"Deeplearning4J output shape"),
+})
+public class DeepLearning4JPredictor extends 
AbstractDeepLearning4JProcessor {
+
+static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+.description("Successful DeepLearning4j results are routed to 
this relationship").build();
+
+static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("Failed DeepLearning4j results are routed to this 
relationship").build();
+
+protected Gson gson = new Gson();
+
+private static final Set relationships;
+private static final List propertyDescriptors;
+static {
+final Set tempRelationships = new HashSet<>();
+tempRelationships.add(REL_SUCCESS);
+tempRelationships.add(REL_FAILURE);
+relatio

[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

2018-05-11 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2686
  
@markap14 - 

Thanks for your prompt review and advice.  

I've updated the code based on your review and am looking forward to 
your/other members feedback.

Thanks again.

Mans


---


[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-05-11 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r187655735
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.nd4j.linalg.api.ndarray.INDArray;
+import org.nd4j.linalg.factory.Nd4j;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"deeplearning4j", "dl4j", "predict", "classification", 
"regression", "deep", "learning"})
+@CapabilityDescription("The DeepLearning4JPredictor predicts one or more 
value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) 
model and the content of a FlowFile. "
++ "The processor supports both classification and regression by 
extracting the record from the FlowFile body and applying the model. "
++ "The processor supports batch by allowing multiple records to be 
passed in the FlowFile body with each record separated by the 'Record 
Separator' property. "
++ "Each record can contain multiple fields with each field separated 
by the 'Field Separator' property."
+)
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = 
"Deeplearning4J error message"),
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = 
"Deeplearning4J output shape"),
+})
+public class DeepLearning4JPredictor extends 
AbstractDeepLearning4JProcessor {
+
+static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+.description("Successful DeepLearning4j results are routed to 
this relationship").build();
+
+static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("Failed DeepLearning4j results are routed to this 
relationship").build();
+
+protected Gson gson = new Gson();
+
+private static final Set relationships;
+private static final List propertyDescriptors;
+static {
+final Set tempRelationships = new HashSet<>();
+tempRelationships.add(REL_SUCCESS);
+tempRelationships.add(REL_FAILURE);
+relatio

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-05-11 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r187655641
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import java.io.IOException;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.deeplearning4j.util.ModelSerializer;
+
+/**
+ * Base class for deeplearning4j processors
+ */
+public abstract class AbstractDeepLearning4JProcessor extends 
AbstractProcessor {
--- End diff --

@markap14  - This is to establish a foundation for future extensions which 
will be easier if some common base classes are present.   I found this pattern 
to be useful and hope it's not an overkill.


---


[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-05-11 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r187655109
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification_test.txt
 ---
@@ -0,0 +1,100 @@
+1.1,0.5,0.5,0.2,0
--- End diff --

I've mentioned at the beginning of the tests that these are based on 
deeplearning4j examples.  I generated this very simple/small sample file to 
make consistent predictions for the tests even with just a few observations.


---


[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-05-11 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r187654464
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.nd4j.linalg.api.ndarray.INDArray;
+import org.nd4j.linalg.factory.Nd4j;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"deeplearning4j", "dl4j", "predict", "classification", 
"regression", "deep", "learning"})
+@CapabilityDescription("The DeepLearning4JPredictor predicts one or more 
value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) 
model and the content of a FlowFile. "
++ "The processor supports both classification and regression by 
extracting the record from the FlowFile body and applying the model. "
++ "The processor supports batch by allowing multiple records to be 
passed in the FlowFile body with each record separated by the 'Record 
Separator' property. "
++ "Each record can contain multiple fields with each field separated 
by the 'Field Separator' property."
+)
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = 
"Deeplearning4J error message"),
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = 
"Deeplearning4J output shape"),
+})
+public class DeepLearning4JPredictor extends 
AbstractDeepLearning4JProcessor {
+
+static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+.description("Successful DeepLearning4j results are routed to 
this relationship").build();
+
+static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("Failed DeepLearning4j results are routed to this 
relationship").build();
+
+protected Gson gson = new Gson();
+
+private static final Set relationships;
+private static final List propertyDescriptors;
+static {
+final Set tempRelationships = new HashSet<>();
+tempRelationships.add(REL_SUCCESS);
+tempRelationships.add(REL_FAILURE);
+relatio

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-05-11 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r187654383
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.nd4j.linalg.api.ndarray.INDArray;
+import org.nd4j.linalg.factory.Nd4j;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"deeplearning4j", "dl4j", "predict", "classification", 
"regression", "deep", "learning"})
+@CapabilityDescription("The DeepLearning4JPredictor predicts one or more 
value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) 
model and the content of a FlowFile. "
++ "The processor supports both classification and regression by 
extracting the record from the FlowFile body and applying the model. "
++ "The processor supports batch by allowing multiple records to be 
passed in the FlowFile body with each record separated by the 'Record 
Separator' property. "
++ "Each record can contain multiple fields with each field separated 
by the 'Field Separator' property."
+)
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = 
"Deeplearning4J error message"),
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = 
"Deeplearning4J output shape"),
+})
+public class DeepLearning4JPredictor extends 
AbstractDeepLearning4JProcessor {
+
+static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+.description("Successful DeepLearning4j results are routed to 
this relationship").build();
+
+static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("Failed DeepLearning4j results are routed to this 
relationship").build();
+
+protected Gson gson = new Gson();
+
+private static final Set relationships;
+private static final List propertyDescriptors;
+static {
+final Set tempRelationships = new HashSet<>();
+tempRelationships.add(REL_SUCCESS);
+tempRelationships.add(REL_FAILURE);
+relatio

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-05-11 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r187654258
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.nd4j.linalg.api.ndarray.INDArray;
+import org.nd4j.linalg.factory.Nd4j;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"deeplearning4j", "dl4j", "predict", "classification", 
"regression", "deep", "learning"})
+@CapabilityDescription("The DeepLearning4JPredictor predicts one or more 
value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) 
model and the content of a FlowFile. "
++ "The processor supports both classification and regression by 
extracting the record from the FlowFile body and applying the model. "
++ "The processor supports batch by allowing multiple records to be 
passed in the FlowFile body with each record separated by the 'Record 
Separator' property. "
++ "Each record can contain multiple fields with each field separated 
by the 'Field Separator' property."
+)
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = 
"Deeplearning4J error message"),
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = 
"Deeplearning4J output shape"),
+})
+public class DeepLearning4JPredictor extends 
AbstractDeepLearning4JProcessor {
+
+static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+.description("Successful DeepLearning4j results are routed to 
this relationship").build();
+
+static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("Failed DeepLearning4j results are routed to this 
relationship").build();
+
+protected Gson gson = new Gson();
+
+private static final Set relationships;
+private static final List propertyDescriptors;
+static {
+final Set tempRelationships = new HashSet<>();
+tempRelationships.add(REL_SUCCESS);
+tempRelationships.add(REL_FAILURE);
+relatio

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-05-11 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r187654197
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.nd4j.linalg.api.ndarray.INDArray;
+import org.nd4j.linalg.factory.Nd4j;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"deeplearning4j", "dl4j", "predict", "classification", 
"regression", "deep", "learning"})
+@CapabilityDescription("The DeepLearning4JPredictor predicts one or more 
value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) 
model and the content of a FlowFile. "
++ "The processor supports both classification and regression by 
extracting the record from the FlowFile body and applying the model. "
++ "The processor supports batch by allowing multiple records to be 
passed in the FlowFile body with each record separated by the 'Record 
Separator' property. "
++ "Each record can contain multiple fields with each field separated 
by the 'Field Separator' property."
+)
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = 
"Deeplearning4J error message"),
+@WritesAttribute(attribute = 
AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = 
"Deeplearning4J output shape"),
+})
+public class DeepLearning4JPredictor extends 
AbstractDeepLearning4JProcessor {
--- End diff --

This is the intuitive name I could come up with.  Please let me know if you 
have any other recommendations.


---


[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-05-11 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r187653950
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import java.io.IOException;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.deeplearning4j.util.ModelSerializer;
+
+/**
+ * Base class for deeplearning4j processors
+ */
+public abstract class AbstractDeepLearning4JProcessor extends 
AbstractProcessor {
+
+public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+.name("deeplearning4j-charset")
+.displayName("Character Set")
+.description("Specifies the character set of the document 
data.")
+.required(true)
+.defaultValue("UTF-8")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor FIELD_SEPARATOR = new 
PropertyDescriptor.Builder()
+.name("deeplearning4j-field-separator")
+.displayName("Field Separator")
+.description("Specifies the field separator in the records. 
(default is comma)")
+.required(true)
+.defaultValue(",")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_SEPARATOR = new 
PropertyDescriptor.Builder()
+.name("deeplearning4j-record-separator")
+.displayName("Record Separator")
+.description("Specifies the records separator in the message 
body. (defaults to new line)")
+.required(true)
+.defaultValue(System.lineSeparator())
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MODEL_FILE = new 
PropertyDescriptor.Builder()
+.name("model-file")
+.displayName("Model File")
+.description("Location of the Deeplearning4J model zip file")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_DIMENSIONS = new 
PropertyDescriptor.Builder()
+.name("deeplearning4j-record-dimension")
+.displayName("Record dimensions separated by field separator")
+.description("Dimension of array in each a record (eg: 2,4 - a 
2x4 array)")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final String DEEPLEARNING4J_ERROR_MESSAGE = 
"deeplear

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-05-11 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r187653982
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import java.io.IOException;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.deeplearning4j.util.ModelSerializer;
+
+/**
+ * Base class for deeplearning4j processors
+ */
+public abstract class AbstractDeepLearning4JProcessor extends 
AbstractProcessor {
+
+public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+.name("deeplearning4j-charset")
+.displayName("Character Set")
+.description("Specifies the character set of the document 
data.")
+.required(true)
+.defaultValue("UTF-8")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor FIELD_SEPARATOR = new 
PropertyDescriptor.Builder()
+.name("deeplearning4j-field-separator")
+.displayName("Field Separator")
+.description("Specifies the field separator in the records. 
(default is comma)")
+.required(true)
+.defaultValue(",")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_SEPARATOR = new 
PropertyDescriptor.Builder()
+.name("deeplearning4j-record-separator")
+.displayName("Record Separator")
+.description("Specifies the records separator in the message 
body. (defaults to new line)")
+.required(true)
+.defaultValue(System.lineSeparator())
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MODEL_FILE = new 
PropertyDescriptor.Builder()
+.name("model-file")
+.displayName("Model File")
+.description("Location of the Deeplearning4J model zip file")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_DIMENSIONS = new 
PropertyDescriptor.Builder()
+.name("deeplearning4j-record-dimension")
+.displayName("Record dimensions separated by field separator")
+.description("Dimension of array in each a record (eg: 2,4 - a 
2x4 array)")
+.required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final String DEEPLEARNING4J_ERROR_MESSAGE = 
&

[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-05-11 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r187653539
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import java.io.IOException;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.deeplearning4j.util.ModelSerializer;
+
+/**
+ * Base class for deeplearning4j processors
+ */
+public abstract class AbstractDeepLearning4JProcessor extends 
AbstractProcessor {
+
+public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+.name("deeplearning4j-charset")
+.displayName("Character Set")
+.description("Specifies the character set of the document 
data.")
+.required(true)
+.defaultValue("UTF-8")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor FIELD_SEPARATOR = new 
PropertyDescriptor.Builder()
+.name("deeplearning4j-field-separator")
+.displayName("Field Separator")
+.description("Specifies the field separator in the records. 
(default is comma)")
+.required(true)
+.defaultValue(",")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_SEPARATOR = new 
PropertyDescriptor.Builder()
+.name("deeplearning4j-record-separator")
+.displayName("Record Separator")
+.description("Specifies the records separator in the message 
body. (defaults to new line)")
--- End diff --

Corrected


---


[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-05-11 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2686#discussion_r187653485
  
--- Diff: 
nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java
 ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.deeplearning4j;
+import java.io.IOException;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
+import org.deeplearning4j.util.ModelSerializer;
+
+/**
+ * Base class for deeplearning4j processors
+ */
+public abstract class AbstractDeepLearning4JProcessor extends 
AbstractProcessor {
+
+public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+.name("deeplearning4j-charset")
+.displayName("Character Set")
+.description("Specifies the character set of the document 
data.")
+.required(true)
+.defaultValue("UTF-8")
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor FIELD_SEPARATOR = new 
PropertyDescriptor.Builder()
+.name("deeplearning4j-field-separator")
+.displayName("Field Separator")
+.description("Specifies the field separator in the records. 
(default is comma)")
--- End diff --

Corrected.


---


[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...

2018-05-08 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2686
  
Good Morning Nifi Folks:

The appveyor build is passing for this PR but travis-ci build is failing 
with the following message:

`[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-surefire-plugin:2.20.1:test (default-test) on 
project nifi-cdc-mysql-processors: There are test failures.
[ERROR] 
[ERROR] Please refer to 
/home/travis/build/apache/nifi/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/target/surefire-reports
 for the individual test results.
[ERROR] Please refer to dump files (if any exist) [date]-jvmRun[N].dump, 
[date].dumpstream and [date]-jvmRun[N].dumpstream.`

Can you please advice on how to resolve this error ?

Thanks

Mans


---


[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...

2018-05-07 Thread mans2singh
GitHub user mans2singh opened a pull request:

https://github.com/apache/nifi/pull/2686

NIFI-5166 - Deep learning classification and regression processor wit…

…h deeplearning4j

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ x] Is your initial contribution a single, squashed commit?

### For code changes:
- [ x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ x] Have you written or updated unit tests to verify your changes?
- [ x] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ x] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [x] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [x] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mans2singh/nifi NIFI-5166

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2686.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2686


commit f58316a906af820f9f056c6ebee171015685f86b
Author: mans2singh <mans2singh@...>
Date:   2018-05-08T05:11:00Z

NIFI-5166 - Deep learning classification and regression processor with 
deeplearning4j




---


[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-11 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2562
  
@MikeThomsen  @joewitt - Thanks for your help in making this contribution 
possible.

@timhallinflux - Please let me know if you have any additional enhancements 
possible.  I will contact you 
via email.

Thanks again everyone.


---


[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-10 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2562
  
@MikeThomsen - 

I've updated the code (added expression language scope and updated tests) 
based your review comments.  The integration tests are passing.  

Please let me know if there is any other comment.

Thanks

Mans


---


[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-05 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2562
  
Thanks @MikeThomsen for your reveiw/advice.  Also thanks @timhallinflux for 
the pointers on influxdb sandbox.

Please let me know if there is other feedback.

Mans


---


[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-03 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2562
  
@MikeThomsen - 

I have updated the test cases regarding usage of assertions.  Let me know 
if there is anything else outstanding.

Thanks


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-03 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r179012098
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java
 ---
@@ -110,8 +125,72 @@ public void testCreateDB() {
 assertEquals("Value should be equal",null, 
flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
 assertEquals("Value should be equal",query, 
flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
 
-flowFiles.get(0).assertContentEquals("{\"results\":[{}]}");
+QueryResult queryResult = gson.fromJson(new StringReader(new 
String(flowFiles.get(0).toByteArray())), QueryResult.class);
+assertEquals("results array should be empty", 1, 
queryResult.getResults().size());
+assertEquals("No series", null, 
queryResult.getResults().get(0).getSeries());
--- End diff --

Corrected.


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-03 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r179012073
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java
 ---
@@ -110,8 +125,72 @@ public void testCreateDB() {
 assertEquals("Value should be equal",null, 
flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
 assertEquals("Value should be equal",query, 
flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
 
-flowFiles.get(0).assertContentEquals("{\"results\":[{}]}");
+QueryResult queryResult = gson.fromJson(new StringReader(new 
String(flowFiles.get(0).toByteArray())), QueryResult.class);
+assertEquals("results array should be empty", 1, 
queryResult.getResults().size());
--- End diff --

Corrected.


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-03 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r179012036
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java
 ---
@@ -110,8 +125,72 @@ public void testCreateDB() {
 assertEquals("Value should be equal",null, 
flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
 assertEquals("Value should be equal",query, 
flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
 
-flowFiles.get(0).assertContentEquals("{\"results\":[{}]}");
+QueryResult queryResult = gson.fromJson(new StringReader(new 
String(flowFiles.get(0).toByteArray())), QueryResult.class);
+assertEquals("results array should be empty", 1, 
queryResult.getResults().size());
+assertEquals("No series", null, 
queryResult.getResults().get(0).getSeries());
+}
+
+@Test
+public void testEmptyFlowFileQueryWithScheduledQuery() {
+String message = "water,country=US,city=newark rain=1,humidity=0.6 
1501002274856668652";
+influxDB.write(dbName, DEFAULT_RETENTION_POLICY, 
InfluxDB.ConsistencyLevel.ONE, message);
+
+String query = "select * from water";
+runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query);
+
+byte [] bytes = new byte [] {};
+runner.enqueue(bytes);
+runner.run(1,true,true);
+
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
+
+List flowFiles = 
runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
+assertEquals("Value should be equal", 1, flowFiles.size());
+assertEquals("Value should be equal",null, 
flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
--- End diff --

Corrected.


---


[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-01 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2562
  
@MikeThomsen - 

I've updated the code based on your comments.  Let me know if you have any 
more recommendations.

Thanks


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-01 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r178480359
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import static org.junit.Assert.assertEquals;
+import org.junit.Assert;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.QueryResult;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Integration test for executing InfluxDB queries. Please ensure that the 
InfluxDB is running
+ * on local host with default port and has database test with table test. 
Please set user
+ * and password if applicable before running the integration tests.
+ */
+public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB {
+
+@Before
+public void setUp() throws Exception {
+initInfluxDB();
+runner = TestRunners.newTestRunner(ExecuteInfluxDBQuery.class);
+initializeRunner();
+}
+
+@Test
+public void testValidScheduleQueryWithNoIncoming() {
+String message = "water,country=US,city=newark rain=1,humidity=0.6 
1501002274856668652";
+influxDB.write(dbName, DEFAULT_RETENTION_POLICY, 
InfluxDB.ConsistencyLevel.ONE, message);
+
+String query = "select * from water";
+runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query);
+
+runner.setIncomingConnection(false);
+runner.run(1,true,true);
+
runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1);
+List flowFiles = 
runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS);
+assertEquals("Value should be equal", 1, flowFiles.size());
+assertEquals("Value should be equal",null, 
flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE));
+assertEquals("Value should be equal",query, 
flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY));
+flowFiles.get(0).assertContentEquals(
--- End diff --

Updated to parse json and compare typed results.


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-01 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r178480131
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile (preferred) or a scheduled query.  Please check details 
of the supported queries in InfluxDB documentation 
(https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
+})
+public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
+
+public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
+
+public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT 
= new PropertyDescriptor.Builder()
+.name("influxdb-query-result-time-unit")
+.displayName("Query Result Time Units")
+.description("The time unit of query results from the 
InfluxDB")
+.defaultValue(TimeUnit.NANOSECONDS.name())
+.required(true)
+.expressionLanguageSupported(true)
+.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> 
v.name()).collect(Collectors.toSet()))
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor INFLUX_DB_QUERY = new 
PropertyDescriptor.Builder()
+.name("influxdb-query")
+.displayName("InfluxDB Query")
+.description("The InfluxDB query to execute. "
++ "Note: If there are incoming connections, then the 
query is created from incoming FlowFile's content an

[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-01 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r178480144
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.util.TestRunner;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import org.junit.After;
+
+/**
+ * Base integration test class for InfluxDB processors
+ */
+public class AbstractITInfluxDB {
+protected TestRunner runner;
+protected InfluxDB influxDB;
+protected String dbName = "test";
+protected String dbUrl = "http://localhost:8086;;
+protected String user = "admin";
+protected String password = "admin";
+protected static final String DEFAULT_RETENTION_POLICY = "autogen";
+
+protected void initInfluxDB() throws InterruptedException, Exception {
+influxDB = InfluxDBFactory.connect(dbUrl,user,password);
+cleanUpDatabase();
--- End diff --

Removed as recommended.


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-01 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r178480116
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile (preferred) or a scheduled query.  Please check details 
of the supported queries in InfluxDB documentation 
(https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
+})
+public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
+
+public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
+
+public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT 
= new PropertyDescriptor.Builder()
+.name("influxdb-query-result-time-unit")
+.displayName("Query Result Time Units")
+.description("The time unit of query results from the 
InfluxDB")
+.defaultValue(TimeUnit.NANOSECONDS.name())
+.required(true)
+.expressionLanguageSupported(true)
+.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> 
v.name()).collect(Collectors.toSet()))
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor INFLUX_DB_QUERY = new 
PropertyDescriptor.Builder()
+.name("influxdb-query")
+.displayName("InfluxDB Query")
+.description("The InfluxDB query to execute. "
++ "Note: If there are incoming connections, then the 
query is created from incoming FlowFile's content an

[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-01 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r178480026
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile (preferred) or a scheduled query.  Please check details 
of the supported queries in InfluxDB documentation 
(https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
+})
+public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
+
+public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
+
+public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT 
= new PropertyDescriptor.Builder()
+.name("influxdb-query-result-time-unit")
+.displayName("Query Result Time Units")
+.description("The time unit of query results from the 
InfluxDB")
+.defaultValue(TimeUnit.NANOSECONDS.name())
+.required(true)
+.expressionLanguageSupported(true)
+.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> 
v.name()).collect(Collectors.toSet()))
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor INFLUX_DB_QUERY = new 
PropertyDescriptor.Builder()
+.name("influxdb-query")
+.displayName("InfluxDB Query")
+.description("The InfluxDB query to execute. "
++ "Note: If there are incoming connections, then the 
query is created from incoming FlowFile's content an

[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-01 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r178479805
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile (preferred) or a scheduled query.  Please check details 
of the supported queries in InfluxDB documentation 
(https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
+})
+public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
+
+public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
+
+public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT 
= new PropertyDescriptor.Builder()
+.name("influxdb-query-result-time-unit")
+.displayName("Query Result Time Units")
+.description("The time unit of query results from the 
InfluxDB")
+.defaultValue(TimeUnit.NANOSECONDS.name())
+.required(true)
+.expressionLanguageSupported(true)
+.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> 
v.name()).collect(Collectors.toSet()))
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor INFLUX_DB_QUERY = new 
PropertyDescriptor.Builder()
+.name("influxdb-query")
+.displayName("InfluxDB Query")
+.description("The InfluxDB query to execute. "
++ "Note: If there are incoming connections, then the 
query is created from incoming FlowFile's content an

[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-01 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r178479729
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile (preferred) or a scheduled query.  Please check details 
of the supported queries in InfluxDB documentation 
(https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
+})
+public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
+
+public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
+
+public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT 
= new PropertyDescriptor.Builder()
+.name("influxdb-query-result-time-unit")
+.displayName("Query Result Time Units")
+.description("The time unit of query results from the 
InfluxDB")
+.defaultValue(TimeUnit.NANOSECONDS.name())
+.required(true)
+.expressionLanguageSupported(true)
+.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> 
v.name()).collect(Collectors.toSet()))
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor INFLUX_DB_QUERY = new 
PropertyDescriptor.Builder()
+.name("influxdb-query")
+.displayName("InfluxDB Query")
+.description("The InfluxDB query to execute. "
++ "Note: If there are incoming connections, then the 
query is created from incoming FlowFile's content an

[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-01 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r178479713
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile (preferred) or a scheduled query.  Please check details 
of the supported queries in InfluxDB documentation 
(https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
+})
+public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
+
+public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
+
+public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT 
= new PropertyDescriptor.Builder()
+.name("influxdb-query-result-time-unit")
+.displayName("Query Result Time Units")
+.description("The time unit of query results from the 
InfluxDB")
+.defaultValue(TimeUnit.NANOSECONDS.name())
+.required(true)
+.expressionLanguageSupported(true)
+.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> 
v.name()).collect(Collectors.toSet()))
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor INFLUX_DB_QUERY = new 
PropertyDescriptor.Builder()
+.name("influxdb-query")
+.displayName("InfluxDB Query")
+.description("The InfluxDB query to execute. "
++ "Note: If there are incoming connections, then the 
query is created from incoming FlowFile's content an

[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-04-01 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r178479664
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile (preferred) or a scheduled query.  Please check details 
of the supported queries in InfluxDB documentation 
(https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
+})
+public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
+
+public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
+
+public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT 
= new PropertyDescriptor.Builder()
+.name("influxdb-query-result-time-unit")
+.displayName("Query Result Time Units")
+.description("The time unit of query results from the 
InfluxDB")
+.defaultValue(TimeUnit.NANOSECONDS.name())
+.required(true)
+.expressionLanguageSupported(true)
+.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> 
v.name()).collect(Collectors.toSet()))
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor INFLUX_DB_QUERY = new 
PropertyDescriptor.Builder()
+.name("influxdb-query")
+.displayName("InfluxDB Query")
+.description("The InfluxDB query to execute. "
++ "Note: If there are incoming connections, then the 
query is created from incoming FlowFile's content an

[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-26 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2562
  
@MikeThomsen - I've updated the code based on your comments (added check 
for query result null, changed scheduled query property name).  One note, if we 
use the influxdb-compose.xml file for integration testing, we will need to 
change the name of the db (from test to something else) which the curl loader 
uses, since integration test use their own data.

Please let me know your thoughts/recommendations.

Thanks.


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-26 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r177296540
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile.  Please check details of the supported queries in 
InfluxDB documentation (https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
+})
+public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
+
+public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
+
+public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT 
= new PropertyDescriptor.Builder()
+.name("influxdb-query-result-time-unit")
+.displayName("Query Result Time Units")
+.description("The time unit of query results from the 
InfluxDB")
+.defaultValue(TimeUnit.NANOSECONDS.name())
+.required(true)
+.expressionLanguageSupported(true)
+.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> 
v.name()).collect(Collectors.toSet()))
+.sensitive(false)
+.build();
+
+static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+.description("Successful InfluxDB queries are routed to this 
relationship").build();
+
+static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("Falied InfluxDB queries are routed to this 
relationship").build();
+
+static final Relationship REL_RETRY = new 
Relationship.Builder().name("retry")
+

[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-26 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r177296457
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java
 ---
@@ -36,14 +36,7 @@
 
 protected void initInfluxDB() throws InterruptedException, Exception {
 influxDB = InfluxDBFactory.connect(dbUrl,user,password);
-if ( influxDB.databaseExists(dbName) ) {
-QueryResult result = influxDB.query(new Query("DROP 
measurement water", dbName));
-checkError(result);
-result = influxDB.query(new Query("DROP measurement testm", 
dbName));
-checkError(result);
-result = influxDB.query(new Query("DROP database " + dbName, 
dbName));
-Thread.sleep(1000);
-}
+cleanUpDatabase();
--- End diff --

I call the cleanup in setup as a precaution so that if there is any 
conflicting/previously existing data in the test database, it is removed and 
does not fail the integration test which depend on number of rows inserted.  If 
you think it is unnecessary, I can remove it.


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-26 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r177296275
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -72,6 +74,16 @@
 .sensitive(false)
 .build();
 
+public static final PropertyDescriptor INFLUX_DB_SCHEDULED_QUERY = new 
PropertyDescriptor.Builder()
+.name("influxdb-scheduled-query")
+.displayName("InfluxDB Schedued Query")
--- End diff --

Changed the attribute to INFLUX_DB_QUERY.  The description mentions that 
flow files and timed query both are allowed.


---


[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-25 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2562
  
@joewitt @MikeThomsen - 

I've updated the code based on your comments.  

I've also added a docker gist 
[influxdb-compose.xml](https://gist.github.com/mans2singh/5ee90620314d4bcf26d6c65de2540c77#file-influxdb-compose-xml)
 based on docker compose xml mentioned by @MikeThomsen .  This docker compose 
starts a local influxdb and runs curl commands to create a `test` database and 
populate it with a metric every 5 seconds.  To launch it we need to run 
`docker-compose -f  up` on the local machine and then we can run the 
integration tests for the query processor or run Nifi locally with processor 
configured for scheduled query.

I am also including two templates ([flow file 
driven](https://gist.github.com/mans2singh/5ee90620314d4bcf26d6c65de2540c77#file-nifi-influxdb-flow-file-driven-template)
 and [timer 
driven](https://gist.github.com/mans2singh/5ee90620314d4bcf26d6c65de2540c77#file-nifi-influxdb-scheduled-query-template))
 to assist in testing the InfluxDB query processor.

Please let me know if your comments/recommendations.

Thanks again.




---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-25 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r176974725
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.util.TestRunner;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import org.junit.After;
+
+/**
+ * Base integration test class for InfluxDB processors
+ */
+public class AbstractITInfluxDB {
+protected TestRunner runner;
+protected InfluxDB influxDB;
+protected String dbName = "test";
+protected String dbUrl = "http://localhost:8086;;
+protected String user = "admin";
+protected String password = "admin";
+protected static final String DEFAULT_RETENTION_POLICY = "autogen";
+
+protected void initInfluxDB() throws InterruptedException, Exception {
+influxDB = InfluxDBFactory.connect(dbUrl,user,password);
+if ( influxDB.databaseExists(dbName) ) {
--- End diff --

I've refactored the code and call it both from `Before` and `After` to make 
sure that the test setup and tear down is clean.


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-25 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r176974663
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import static org.junit.Assert.assertEquals;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.QueryResult;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Integration test for executing InfluxDB queries. Please ensure that the 
InfluxDB is running
+ * on local host with default port and has database test with table test. 
Please set user
+ * and password if applicable before running the integration tests.
+ */
+public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB {
+
+@Before
+public void setUp() throws Exception {
+runner = TestRunners.newTestRunner(ExecuteInfluxDBQuery.class);
+initializeRunner();
--- End diff --

I've changed the order and call init database before initializing runner.  
Let me know if that is ok.


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-25 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r176974557
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
--- End diff --

Added support for timer based queries based on ExecuteSQL processesor as 
you had recommended and updated test cases.


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-25 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r176974503
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile.  Please check details of the supported queries in 
InfluxDB documentation (https://www.influxdb.com/).")
--- End diff --

Influx queries supports data queries, dml and schema exploration.  I've 
added integration tests for these.


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-25 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r176974419
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile.  Please check details of the supported queries in 
InfluxDB documentation (https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
--- End diff --

Let me know if you have any recommendation for this.


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-25 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r176974380
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile.  Please check details of the supported queries in 
InfluxDB documentation (https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
+})
+public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
+
+public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
+
+public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT 
= new PropertyDescriptor.Builder()
+.name("influxdb-query-result-time-unit")
+.displayName("Query Result Time Units")
+.description("The time unit of query results from the 
InfluxDB")
+.defaultValue(TimeUnit.NANOSECONDS.name())
+.required(true)
+.expressionLanguageSupported(true)
+.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> 
v.name()).collect(Collectors.toSet()))
+.sensitive(false)
+.build();
+
+static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+.description("Successful InfluxDB queries are routed to this 
relationship").build();
+
+static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("Falied InfluxDB queries are routed to this 
relationship").build();
+
+static final Relationship REL_RETRY = new 
Relationship.Builder().name("retry")
+

[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-25 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r176974361
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile.  Please check details of the supported queries in 
InfluxDB documentation (https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
+})
+public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
+
+public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
+
+public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT 
= new PropertyDescriptor.Builder()
+.name("influxdb-query-result-time-unit")
+.displayName("Query Result Time Units")
+.description("The time unit of query results from the 
InfluxDB")
+.defaultValue(TimeUnit.NANOSECONDS.name())
+.required(true)
+.expressionLanguageSupported(true)
+.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> 
v.name()).collect(Collectors.toSet()))
+.sensitive(false)
+.build();
+
+static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+.description("Successful InfluxDB queries are routed to this 
relationship").build();
+
+static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("Falied InfluxDB queries are routed to this 
relationship").build();
+
+static final Relationship REL_RETRY = new 
Relationship.Builder().name("retry")
+

[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-25 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r176974332
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile.  Please check details of the supported queries in 
InfluxDB documentation (https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
+})
+public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
+
+public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
+
+public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT 
= new PropertyDescriptor.Builder()
+.name("influxdb-query-result-time-unit")
+.displayName("Query Result Time Units")
+.description("The time unit of query results from the 
InfluxDB")
+.defaultValue(TimeUnit.NANOSECONDS.name())
+.required(true)
+.expressionLanguageSupported(true)
+.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> 
v.name()).collect(Collectors.toSet()))
+.sensitive(false)
+.build();
+
+static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+.description("Successful InfluxDB queries are routed to this 
relationship").build();
+
+static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("Falied InfluxDB queries are routed to this 
relationship").build();
+
+static final Relationship REL_RETRY = new 
Relationship.Builder().name("retry")
+

[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-25 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r176974260
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile.  Please check details of the supported queries in 
InfluxDB documentation (https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
+})
+public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
+
+public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
+
+public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT 
= new PropertyDescriptor.Builder()
+.name("influxdb-query-result-time-unit")
+.displayName("Query Result Time Units")
+.description("The time unit of query results from the 
InfluxDB")
+.defaultValue(TimeUnit.NANOSECONDS.name())
+.required(true)
+.expressionLanguageSupported(true)
+.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> 
v.name()).collect(Collectors.toSet()))
+.sensitive(false)
+.build();
+
+static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+.description("Successful InfluxDB queries are routed to this 
relationship").build();
+
+static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("Falied InfluxDB queries are routed to this 
relationship").build();
+
+static final Relationship REL_RETRY = new 
Relationship.Builder().name("retry")
+

[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-25 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2562#discussion_r176974126
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java
 ---
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+import com.google.gson.Gson;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.net.SocketTimeoutException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"})
+@CapabilityDescription("Processor to execute InfluxDB query from the 
content of a FlowFile.  Please check details of the supported queries in 
InfluxDB documentation (https://www.influxdb.com/).")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+@WritesAttribute(attribute = 
ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed 
query"),
+})
+public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor {
+
+public static final String INFLUX_DB_EXECUTED_QUERY = 
"influxdb.executed.query";
+
+public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT 
= new PropertyDescriptor.Builder()
+.name("influxdb-query-result-time-unit")
+.displayName("Query Result Time Units")
+.description("The time unit of query results from the 
InfluxDB")
+.defaultValue(TimeUnit.NANOSECONDS.name())
+.required(true)
+.expressionLanguageSupported(true)
+.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> 
v.name()).collect(Collectors.toSet()))
+.sensitive(false)
+.build();
+
+static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+.description("Successful InfluxDB queries are routed to this 
relationship").build();
+
+static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+.description("Falied InfluxDB queries are routed to this 
relationship").build();
+
+static final Relationship REL_RETRY = new 
Relationship.Builder().name("retry")
+

[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-24 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2562
  
Hi @MikeThomsen - Thanks for your review and comments.  

I will work on your and @joewitt 's recommendation this weekend.



---


[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-19 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2562
  
Hi Nifi Team:

Please let me know your suggestions/recommendations on this InfluxDB Query 
Processsor.

Thanks


---


[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor

2018-03-17 Thread mans2singh
GitHub user mans2singh opened a pull request:

https://github.com/apache/nifi/pull/2562

NIFI-4927 - InfluxDB Query Processor

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [x] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [x] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [x] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [x] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [x] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mans2singh/nifi NIFI-4927

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2562.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2562


commit 7c607eacb1aadb15295bc646ba380fed952cce6e
Author: mans2singh <mans2singh@...>
Date:   2018-03-18T01:50:08Z

NIFI-4927 - InfluxDB Query Processor




---


[GitHub] nifi issue #2101: NIFI-4289 - InfluxDB put processor

2018-02-24 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2101
  
Thanks @MikeThomsen @pvillard31 @mattyb149 @joewitt @jskora and everyone 
for your advice/support.


---


[GitHub] nifi issue #2101: NIFI-4289 - InfluxDB put processor

2018-02-23 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2101
  
@pvillard31 @mattyb149 @MikeThomsen 

I've added expression language support for username and password.

Please let me know if there is any other recommendation.

Thanks

Mans


---


[GitHub] nifi issue #2101: NIFI-4289 - InfluxDB put processor

2018-02-22 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2101
  
@pvillard31 - 

You were right - After renaming the integration tests they don't get 
executed by default mvn install.  Also removed test profile from pom.xml as 
recommended.

Please let me know if you have any additional recommendation.

Thanks again.


---


[GitHub] nifi pull request #2101: NIFI-4289 - InfluxDB put processor

2018-02-22 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2101#discussion_r169968256
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml ---
@@ -0,0 +1,88 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-influxdb-bundle
+1.6.0-SNAPSHOT
+
+
+nifi-influxdb-processors
+jar
+
+
+   
+   org.influxdb
+   influxdb-java
+
+
+org.apache.commons
+commons-lang3
+
+
+org.apache.nifi
+nifi-api
+
+
+org.apache.nifi
+nifi-utils
+
+
+org.apache.nifi
+nifi-mock
+test
+
+
+org.slf4j
+slf4j-simple
+test
+
+
+junit
+junit
+test
+
+
+com.google.guava
+guava
+test
+
+
+
+
+default
+
+true
+
+
+
+
+org.apache.maven.plugins
+maven-surefire-plugin
+
+
+**/IT*.java
--- End diff --

@pvillard31 

If I remove this profile/configuration from the pom.xml and run `mvn clean 
install` in that directory, the integration tests get executed and fail if 
InfluxDb is not running on the local server.  

```
Tests in error: 
  ITPutInfluxDBTest.setUp:58 » InfluxDBIO java.net.ConnectException: 
Failed to c...
  ITPutInfluxDBTest.setUp:58 » InfluxDBIO java.net.ConnectException: 
Failed to c...
  ITPutInfluxDBTest.setUp:58 » InfluxDBIO java.net.ConnectException: 
Failed to c...
  ITPutInfluxDBTest.setUp:58 » InfluxDBIO java.net.ConnectException: 
Failed to c...
  ITPutInfluxDBTest.setUp:58 » InfluxDBIO java.net.ConnectException: 
Failed to c...
  ITPutInfluxDBTest.setUp:58 » InfluxDBIO java.net.ConnectException: 
Failed to c...
```

Can you please let me know how I can skip running the integration tests 
without using this profile ?

Thanks for @MikeThomsen and your feedback.  


---


[GitHub] nifi issue #2101: NIFI-4289 - InfluxDB put processor

2018-02-07 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2101
  
Hi @MikeThomsen @mattyb149 @joewitt 

I believe I have implemented all the review changes.  Please let me know if 
there is anything I have missed or you have any additional recommendations.

Thanks for your feedback.




---


[GitHub] nifi pull request #2101: NIFI-4289 - InfluxDB put processor

2018-02-03 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2101#discussion_r165818221
  
--- Diff: 
nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.influxdb;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.influxdb.InfluxDB;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@SupportsBatching
+@Tags({"influxdb", "measurement","insert", "write", "put", "timeseries"})
+@CapabilityDescription("Processor to write the content of a FlowFile (in 
line protocol 
https://docs.influxdata.com/influxdb/v1.3/write_protocols/line_protocol_tutorial/)
 to InfluxDB (https://www.influxdb.com/). "
++ "  The flow file can contain single measurement point or 
multiple measurement points separated by line seperator.  The timestamp (last 
field) should be in nano-seconds resolution.")
+@WritesAttributes({
+@WritesAttribute(attribute = 
AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB 
error message"),
+})
+public class PutInfluxDB extends AbstractInfluxDBProcessor {
+
+public static AllowableValue CONSISTENCY_LEVEL_ALL = new 
AllowableValue("ALL", "All", "Return success when all nodes have responded with 
write success");
+public static AllowableValue CONSISTENCY_LEVEL_ANY = new 
AllowableValue("ANY", "Any", "Return success when any nodes have responded with 
write success");
+public static AllowableValue CONSISTENCY_LEVEL_ONE = new 
AllowableValue("ONE", "One", "Return success when one node has responded with 
write success");
+public static AllowableValue CONSISTENCY_LEVEL_QUORUM = new 
AllowableValue("QUORUM", "Quorum", "Return success when a majority of nodes 
have responded with write success");
+
+public static final PropertyDescriptor CONSISTENCY_LEVEL = new 
PropertyDescriptor.Builder()
+.name("influxdb-consistency-level")
+.displayName("Consistency Level")
+.description("InfluxDB consistency level")
+.required(true)
+.defaultValue(CONSISTENCY_LEVEL_ONE.getValue())
+.expressionLanguageSupported(true)
+.allowableValues(CONSISTENCY_LEVEL_ONE, CONSISTENCY_LEVEL_ANY, 
CONSISTENCY_LEVEL_ALL, CONSISTENCY_LEVEL_QUORUM)
+.build();
+
+public static final PropertyDescriptor RETENTION_P

[GitHub] nifi issue #2101: NIFI-4289 - InfluxDB put processor

2018-02-03 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2101
  
@MikeThomsen - 

If the fields, tags and timestamp for a measurement are the same, they are 
considered to be the same record. 

Regarding size limit - I did not find any mention of size limit for posting 
data in the influxdb docs. I think this all depends on use cases and with the 
size limit processor property available, the nifi admin configure the values 
based on their influx db cluster and load.

Let me know if I have missed anything or anything else required.

Thanks again for your advice/comments.


---


[GitHub] nifi issue #2101: NIFI-4289 - InfluxDB put processor

2018-02-02 Thread mans2singh
Github user mans2singh commented on the issue:

https://github.com/apache/nifi/pull/2101
  
@mattyb149 @MikeThomsen 

Please let me know if you have any addiional comments/recommendations for 
this processor.

Thanks


---


  1   2   >