[ https://issues.apache.org/jira/browse/NIFI-901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196190#comment-15196190 ]
ASF GitHub Bot commented on NIFI-901: ------------------------------------- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56235566 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.SSLOptions; +import com.datastax.driver.core.Session; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.exception.ProviderCreationException; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.util.SslContextFactory; +import org.apache.nifi.ssl.SSLContextService; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * AbstractCassandraProcessor is a base class for Cassandra processors and contains logic and variables common to most + * processors integrating with Apache Cassandra. + */ +public abstract class AbstractCassandraProcessor extends AbstractProcessor { + + public static final int DEFAULT_CASSANDRA_PORT = 9042; + + private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final List<String> esList = Arrays.asList(input.split(",")); + for (String hostnamePort : esList) { + String[] addresses = hostnamePort.split(":"); + // Protect against invalid input like http://127.0.0.1:9042 (URL scheme should not be there) + if (addresses.length != 2) { + return new ValidationResult.Builder().subject(subject).input(input).explanation( + "Each entry must be in hostname:port form (no scheme such as http://, and port must be specified)") + .valid(false).build(); + } + // Validate the port + String port = addresses[1].trim(); + ValidationResult portValidatorResult = StandardValidators.PORT_VALIDATOR.validate(subject, port, context); + if (!portValidatorResult.isValid()) { + return portValidatorResult; + } + + } + return new ValidationResult.Builder().subject(subject).input(input).explanation( + "Valid cluster definition").valid(true).build(); + } + }; + + // Common descriptors + public static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder() + .name("Cassandra Contact Points") + .description("Contact points are addresses of Cassandra nodes. The list of contact points should be " + + "comma-separated and in hostname:port format. Example node1:port,node2:port,...." + + " The default client port for Cassandra is 9042, but the port(s) must be explicitly specified.") + .required(true) + .expressionLanguageSupported(false) + .addValidator(HOSTNAME_PORT_VALIDATOR) + .build(); + + public static final PropertyDescriptor KEYSPACE = new PropertyDescriptor.Builder() + .name("Keyspace") + .description("The Cassandra Keyspace to connect to. If no keyspace is specified, the query will need to " + + "include the keyspace name before any table reference.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL " + + "connections.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + .name("Client Auth") + .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. " + + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context " + + "has been defined and enabled.") + .required(false) + .allowableValues(SSLContextService.ClientAuth.values()) + .defaultValue("REQUIRED") + .build(); + + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("Username") + .description("Username to access the Cassandra cluster") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("Password to access the Cassandra cluster") + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder() + .name("Consistency Level") + .description("The strategy for how many replicas must respond before results are returned.") + .required(true) + .allowableValues(ConsistencyLevel.values()) + .defaultValue("ONE") + .build(); + + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies the character set of the record data.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from SQL query result set.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship") --- End diff -- The descriptions for the relationships reference SQL. I don't believe that's correct. > Create processors to get/put data with Apache Cassandra > ------------------------------------------------------- > > Key: NIFI-901 > URL: https://issues.apache.org/jira/browse/NIFI-901 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Joseph Witt > Assignee: Matt Burgess > Fix For: 0.6.0 > > > Develop processors to interact with Apache Cassandra. The current http > processors may actually support this as is but such configuration may be too > complex to provide the quality user experience desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)