Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r216783486 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + + protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() + .name("neo4J-query") + .displayName("Neo4J Query") + .description("Specifies the Neo4j Query.") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() + .name("neo4j-connection-url") + .displayName("Neo4j Connection URL") + .description("Neo4J endpoing to connect to.") + .required(true) + .defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("neo4j-username") + .displayName("Username") + .description("Username for accessing Neo4J") + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("neo4j-password") + .displayName("Password") + .description("Password for Neo4J user") + .required(true) + .sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round Robin Strategy"); + + public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.name(), "Least Connected", "Least Connected Strategy"); + + protected static final PropertyDescriptor LOAD_BALANCING_STRATEGY = new PropertyDescriptor.Builder() + .name("neo4j-load-balancing-strategy") + .displayName("Load Balancing Strategy") + .description("Load Balancing Strategy (Round Robin or Least Connected)") + .required(false) + .defaultValue(LOAD_BALANCING_STRATEGY_ROUND_ROBIN.getValue()) + .allowableValues(LOAD_BALANCING_STRATEGY_ROUND_ROBIN, LOAD_BALANCING_STRATEGY_LEAST_CONNECTED) + .build(); + + public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder() + .name("neo4j-max-connection-time-out") + .displayName("Neo4J Max Connection Time Out (seconds)") + .description("The maximum time for establishing connection to the Neo4j") + .defaultValue("5 seconds") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor MAX_CONNECTION_POOL_SIZE = new PropertyDescriptor.Builder() + .name("neo4j-max-connection-pool-size") + .displayName("Neo4J Max Connection Pool Size") + .description("The maximum connection pool size for Neo4j.") + .defaultValue("100") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor MAX_CONNECTION_ACQUISITION_TIMEOUT = new PropertyDescriptor.Builder() + .name("neo4j-max-connection-acquisition-timeout") + .displayName("Neo4J Max Connection Acquisition Timeout") + .description("The maximum connection acquisition timeout.") + .defaultValue("60 second") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor IDLE_TIME_BEFORE_CONNECTION_TEST = new PropertyDescriptor.Builder() + .name("neo4j-idle-time-before-test") + .displayName("Neo4J Idle Time Before Connection Test") + .description("The idle time before connection test.") + .defaultValue("60 seconds") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor MAX_CONNECTION_LIFETIME = new PropertyDescriptor.Builder() + .name("neo4j-max-connection-lifetime") + .displayName("Neo4J Max Connection Lifetime") + .description("The maximum connection lifetime") + .defaultValue("3600 seconds") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor ENCRYPTION = new PropertyDescriptor.Builder() + .name("neo4j-encryption") + .displayName("Neo4J Encrytion") + .description("Is connection encrypted") + .defaultValue("true") + .required(true) + .allowableValues("true","false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .sensitive(false) + .build(); + + public static AllowableValue TRUST_SYSTEM_CA_SIGNED_CERTIFICATES = + new AllowableValue(TrustStrategy.Strategy.TRUST_SYSTEM_CA_SIGNED_CERTIFICATES.name(), + "Trust System CA Signed Certificates", "Trust system specified CA signed certificates"); + + public static AllowableValue TRUST_CUSTOM_CA_SIGNED_CERTIFICATES = + new AllowableValue(TrustStrategy.Strategy.TRUST_CUSTOM_CA_SIGNED_CERTIFICATES.name(), + "Trust Custom CA Signed Certificates", "Trust custom CA signed certificates defined in the file"); + + public static AllowableValue TRUST_ALL_CERTIFICATES = + new AllowableValue(TrustStrategy.Strategy.TRUST_ALL_CERTIFICATES.name(), + "Trust All Certificates", "Trust all certificate"); + + protected static final PropertyDescriptor TRUST_STRATEGY = new PropertyDescriptor.Builder() + .name("neo4j-trust-strategy") + .displayName("Trust Strategy") + .description("Trust Strategy (Trust All Certificates, System CA Signed Certificates or Custom CA Signed Certificates)") + .required(false) + .defaultValue(TRUST_ALL_CERTIFICATES.getValue()) + .allowableValues(TRUST_ALL_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES, TRUST_CUSTOM_CA_SIGNED_CERTIFICATES) + .build(); + + protected static final PropertyDescriptor TRUST_CUSTOM_CA_SIGNED_CERTIFICATES_FILE = new PropertyDescriptor.Builder() + .name("neo4j-custom-ca-strategy-certificates-file") + .displayName("Custom Trust CA Signed Certificates File") + .description("Custom file containing CA signed certificates to be trusted.") + .required(false) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Sucessful FlowFiles are routed to this relationship").build(); + + static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Failed FlowFiles are routed to this relationship").build(); + + public static final String ERROR_MESSAGE = "neo4j.error.message"; + public static final String NODES_CREATED= "neo4j.nodes.created"; + public static final String RELATIONS_CREATED = "neo4j.relations.created"; + public static final String LABELS_ADDED = "neo4j.labels.added"; + public static final String NODES_DELETED = "neo4j.nodes.deleted"; + public static final String RELATIONS_DELETED = "neo4j.relations.deleted"; + public static final String PROPERTIES_SET = "neo4j.properties.set"; + public static final String ROWS_RETURNED = "neo4j.rows.returned"; + + protected Driver neo4JDriver; + protected String username; + protected String password; + protected String connectionUrl; + protected Integer port; + protected LoadBalancingStrategy loadBalancingStrategy; + + /** + * Helper method to help testability + * @return Driver instance + */ + protected Driver getNeo4JDriver() { + return neo4JDriver; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + try { + neo4JDriver = getDriver(context); + } catch(Exception e) { + getLogger().error("Error while getting connection " + e.getLocalizedMessage(),e); + throw new RuntimeException("Error while getting connection" + e.getLocalizedMessage(),e); + } + getLogger().info("Neo4JCypherExecutor connection created for url {}", + new Object[] {connectionUrl}); + } + + protected Driver getDriver(ProcessContext context) { + connectionUrl = context.getProperty(CONNECTION_URL).evaluateAttributeExpressions().getValue(); + username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); + password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); + + ConfigBuilder configBuilder = Config.build(); + String loadBalancingStrategyValue = context.getProperty(LOAD_BALANCING_STRATEGY).getValue(); + if ( ! StringUtils.isBlank(loadBalancingStrategyValue) ) { + configBuilder = configBuilder.withLoadBalancingStrategy( + LoadBalancingStrategy.valueOf(loadBalancingStrategyValue)); + } + + configBuilder.withMaxConnectionPoolSize(context.getProperty(MAX_CONNECTION_POOL_SIZE).asInteger()); + + configBuilder.withConnectionAcquisitionTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS), TimeUnit.SECONDS); + + configBuilder.withMaxConnectionLifetime(context.getProperty(MAX_CONNECTION_ACQUISITION_TIMEOUT).asTimePeriod(TimeUnit.SECONDS), TimeUnit.SECONDS); + + configBuilder.withConnectionLivenessCheckTimeout(context.getProperty(IDLE_TIME_BEFORE_CONNECTION_TEST).asTimePeriod(TimeUnit.SECONDS), TimeUnit.SECONDS); + + if ( context.getProperty(ENCRYPTION).asBoolean() ) --- End diff -- Curly brackets are needed.
---