[ 
https://issues.apache.org/jira/browse/NIFI-901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15196190#comment-15196190
 ] 

ASF GitHub Bot commented on NIFI-901:
-------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/237#discussion_r56235566
  
    --- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 ---
    @@ -0,0 +1,463 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.cassandra;
    +
    +import com.datastax.driver.core.Cluster;
    +import com.datastax.driver.core.ConsistencyLevel;
    +import com.datastax.driver.core.DataType;
    +import com.datastax.driver.core.Metadata;
    +import com.datastax.driver.core.Row;
    +import com.datastax.driver.core.SSLOptions;
    +import com.datastax.driver.core.Session;
    +import org.apache.avro.Schema;
    +import org.apache.avro.SchemaBuilder;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.authorization.exception.ProviderCreationException;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.logging.ProcessorLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.security.util.SslContextFactory;
    +import org.apache.nifi.ssl.SSLContextService;
    +
    +import javax.net.ssl.SSLContext;
    +import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicReference;
    +
    +/**
    + * AbstractCassandraProcessor is a base class for Cassandra processors and 
contains logic and variables common to most
    + * processors integrating with Apache Cassandra.
    + */
    +public abstract class AbstractCassandraProcessor extends AbstractProcessor 
{
    +
    +    public static final int DEFAULT_CASSANDRA_PORT = 9042;
    +
    +    private static final Validator HOSTNAME_PORT_VALIDATOR = new 
Validator() {
    +        @Override
    +        public ValidationResult validate(final String subject, final 
String input, final ValidationContext context) {
    +            final List<String> esList = Arrays.asList(input.split(","));
    +            for (String hostnamePort : esList) {
    +                String[] addresses = hostnamePort.split(":");
    +                // Protect against invalid input like 
http://127.0.0.1:9042 (URL scheme should not be there)
    +                if (addresses.length != 2) {
    +                    return new 
ValidationResult.Builder().subject(subject).input(input).explanation(
    +                            "Each entry must be in hostname:port form (no 
scheme such as http://, and port must be specified)")
    +                            .valid(false).build();
    +                }
    +                // Validate the port
    +                String port = addresses[1].trim();
    +                ValidationResult portValidatorResult = 
StandardValidators.PORT_VALIDATOR.validate(subject, port, context);
    +                if (!portValidatorResult.isValid()) {
    +                    return portValidatorResult;
    +                }
    +
    +            }
    +            return new 
ValidationResult.Builder().subject(subject).input(input).explanation(
    +                    "Valid cluster definition").valid(true).build();
    +        }
    +    };
    +
    +    // Common descriptors
    +    public static final PropertyDescriptor CONTACT_POINTS = new 
PropertyDescriptor.Builder()
    +            .name("Cassandra Contact Points")
    +            .description("Contact points are addresses of Cassandra nodes. 
The list of contact points should be "
    +                    + "comma-separated and in hostname:port format. 
Example node1:port,node2:port,...."
    +                    + " The default client port for Cassandra is 9042, but 
the port(s) must be explicitly specified.")
    +            .required(true)
    +            .expressionLanguageSupported(false)
    +            .addValidator(HOSTNAME_PORT_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor KEYSPACE = new 
PropertyDescriptor.Builder()
    +            .name("Keyspace")
    +            .description("The Cassandra Keyspace to connect to. If no 
keyspace is specified, the query will need to "
    +                    + "include the keyspace name before any table 
reference.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
    +            .name("SSL Context Service")
    +            .description("The SSL Context Service used to provide client 
certificate information for TLS/SSL "
    +                    + "connections.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
    +            .name("Client Auth")
    +            .description("Client authentication policy when connecting to 
secure (TLS/SSL) cluster. "
    +                    + "Possible values are REQUIRED, WANT, NONE. This 
property is only used when an SSL Context "
    +                    + "has been defined and enabled.")
    +            .required(false)
    +            .allowableValues(SSLContextService.ClientAuth.values())
    +            .defaultValue("REQUIRED")
    +            .build();
    +
    +    public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
    +            .name("Username")
    +            .description("Username to access the Cassandra cluster")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
    +            .name("Password")
    +            .description("Password to access the Cassandra cluster")
    +            .required(false)
    +            .sensitive(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor CONSISTENCY_LEVEL = new 
PropertyDescriptor.Builder()
    +            .name("Consistency Level")
    +            .description("The strategy for how many replicas must respond 
before results are returned.")
    +            .required(true)
    +            .allowableValues(ConsistencyLevel.values())
    +            .defaultValue("ONE")
    +            .build();
    +
    +    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
    +            .name("Character Set")
    +            .description("Specifies the character set of the record data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("Successfully created FlowFile from SQL query 
result set.")
    +            .build();
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("SQL query execution failed. Incoming FlowFile 
will be penalized and routed to this relationship")
    --- End diff --
    
    The descriptions for the relationships reference SQL. I don't believe 
that's correct.


> Create processors to get/put data with Apache Cassandra
> -------------------------------------------------------
>
>                 Key: NIFI-901
>                 URL: https://issues.apache.org/jira/browse/NIFI-901
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Joseph Witt
>            Assignee: Matt Burgess
>             Fix For: 0.6.0
>
>
> Develop processors to interact with Apache Cassandra.  The current http 
> processors may actually support this as is but such configuration may be too 
> complex to provide the quality user experience desired.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to