[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2992


---


[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-17 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2992#discussion_r218200235
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("This is a record aware processor that reads the 
content of the incoming FlowFile as individual records using the " +
+"configured 'Record Reader' and writes them to Apache Cassandra 
using native protocol version 3 or higher.")
+public class PutCassandraRecord extends AbstractCassandraProcessor {
+
+static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-reader")
+.displayName("Record Reader")
+.description("Specifies the type of Record Reader controller 
service to use for parsing the incoming data " +
+"and determining the schema")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+static final PropertyDescriptor TABLE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-table")
+.displayName("Table name")
+.description("The name of the Cassandra table to which the 
records have to be written.")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-batch-size")
+.displayName("Batch size")
+  

[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-17 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2992#discussion_r218199589
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("This is a record aware processor that reads the 
content of the incoming FlowFile as individual records using the " +
+"configured 'Record Reader' and writes them to Apache Cassandra 
using native protocol version 3 or higher.")
+public class PutCassandraRecord extends AbstractCassandraProcessor {
+
+static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-reader")
+.displayName("Record Reader")
+.description("Specifies the type of Record Reader controller 
service to use for parsing the incoming data " +
+"and determining the schema")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+static final PropertyDescriptor TABLE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-table")
+.displayName("Table name")
+.description("The name of the Cassandra table to which the 
records have to be written.")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-batch-size")
+.displayName("Batch size")
+  

[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-16 Thread zenfenan
Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2992#discussion_r217948182
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ---
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the content of the incoming FlowFile as 
individual records to Apache Cassandra using native protocol version 3 or 
higher.")
+public class PutCassandraRecord extends AbstractCassandraProcessor {
+
+static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-reader")
+.displayName("Record Reader")
+.description("Specifies the type of Record Reader controller 
service to use for parsing the incoming data " +
+"and determining the schema")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+static final PropertyDescriptor TABLE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-table")
+.displayName("Table name")
+.description("The name of the Cassandra table to which the 
records have to be written.")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-

[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-16 Thread zenfenan
Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2992#discussion_r217948138
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ---
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the content of the incoming FlowFile as 
individual records to Apache Cassandra using native protocol version 3 or 
higher.")
+public class PutCassandraRecord extends AbstractCassandraProcessor {
+
+static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-reader")
+.displayName("Record Reader")
+.description("Specifies the type of Record Reader controller 
service to use for parsing the incoming data " +
+"and determining the schema")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+static final PropertyDescriptor TABLE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-table")
+.displayName("Table name")
+.description("The name of the Cassandra table to which the 
records have to be written.")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-

[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-16 Thread zenfenan
Github user zenfenan commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2992#discussion_r217947988
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ---
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the content of the incoming FlowFile as 
individual records to Apache Cassandra using native protocol version 3 or 
higher.")
--- End diff --

Understood. I'll update it.


---


[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-10 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2992#discussion_r216070172
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ---
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the content of the incoming FlowFile as 
individual records to Apache Cassandra using native protocol version 3 or 
higher.")
+public class PutCassandraRecord extends AbstractCassandraProcessor {
+
+static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-reader")
+.displayName("Record Reader")
+.description("Specifies the type of Record Reader controller 
service to use for parsing the incoming data " +
+"and determining the schema")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+static final PropertyDescriptor TABLE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-table")
+.displayName("Table name")
+.description("The name of the Cassandra table to which the 
records have to be written.")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("put-cassand

[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-10 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2992#discussion_r216353686
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ---
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the content of the incoming FlowFile as 
individual records to Apache Cassandra using native protocol version 3 or 
higher.")
+public class PutCassandraRecord extends AbstractCassandraProcessor {
+
+static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-reader")
+.displayName("Record Reader")
+.description("Specifies the type of Record Reader controller 
service to use for parsing the incoming data " +
+"and determining the schema")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+static final PropertyDescriptor TABLE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-table")
+.displayName("Table name")
+.description("The name of the Cassandra table to which the 
records have to be written.")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("put-cassand

[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-10 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2992#discussion_r216353050
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ---
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the content of the incoming FlowFile as 
individual records to Apache Cassandra using native protocol version 3 or 
higher.")
+public class PutCassandraRecord extends AbstractCassandraProcessor {
+
+static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-reader")
+.displayName("Record Reader")
+.description("Specifies the type of Record Reader controller 
service to use for parsing the incoming data " +
+"and determining the schema")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+static final PropertyDescriptor TABLE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-table")
+.displayName("Table name")
+.description("The name of the Cassandra table to which the 
records have to be written.")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("put-cassand

[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-10 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2992#discussion_r216354635
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ---
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the content of the incoming FlowFile as 
individual records to Apache Cassandra using native protocol version 3 or 
higher.")
+public class PutCassandraRecord extends AbstractCassandraProcessor {
+
+static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-reader")
+.displayName("Record Reader")
+.description("Specifies the type of Record Reader controller 
service to use for parsing the incoming data " +
+"and determining the schema")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+static final PropertyDescriptor TABLE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-table")
+.displayName("Table name")
+.description("The name of the Cassandra table to which the 
records have to be written.")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("put-cassand

[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-10 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2992#discussion_r216353534
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ---
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the content of the incoming FlowFile as 
individual records to Apache Cassandra using native protocol version 3 or 
higher.")
+public class PutCassandraRecord extends AbstractCassandraProcessor {
+
+static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-reader")
+.displayName("Record Reader")
+.description("Specifies the type of Record Reader controller 
service to use for parsing the incoming data " +
+"and determining the schema")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+static final PropertyDescriptor TABLE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-table")
+.displayName("Table name")
+.description("The name of the Cassandra table to which the 
records have to be written.")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("put-cassand

[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-10 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2992#discussion_r216355406
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml ---
@@ -64,5 +64,20 @@
 nifi-ssl-context-service
 test
 
+
--- End diff --

No new dependencies from outside of our project, so L&N looks good.


---


[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-10 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2992#discussion_r216070922
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ---
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the content of the incoming FlowFile as 
individual records to Apache Cassandra using native protocol version 3 or 
higher.")
+public class PutCassandraRecord extends AbstractCassandraProcessor {
+
+static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-reader")
+.displayName("Record Reader")
+.description("Specifies the type of Record Reader controller 
service to use for parsing the incoming data " +
+"and determining the schema")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+static final PropertyDescriptor TABLE = new 
PropertyDescriptor.Builder()
+.name("put-cassandra-record-table")
+.displayName("Table name")
+.description("The name of the Cassandra table to which the 
records have to be written.")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
+static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+.name("put-cassand

[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-10 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2992#discussion_r216069857
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 ---
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Writes the content of the incoming FlowFile as 
individual records to Apache Cassandra using native protocol version 3 or 
higher.")
--- End diff --

It would be helpful to have more content here, particularly explicitly 
calling out that it is a record-aware processor.


---


[GitHub] nifi pull request #2992: NIFI-5510: Introducing PutCassandraRecord processor

2018-09-05 Thread zenfenan
GitHub user zenfenan opened a pull request:

https://github.com/apache/nifi/pull/2992

NIFI-5510: Introducing PutCassandraRecord processor

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [x] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [x] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zenfenan/nifi NIFI-5510

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2992.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2992


commit 4934a159fb769961c47d3809b522dfaa197fc1ac
Author: zenfenan 
Date:   2018-08-17T09:56:47Z

NIFI-5510: Introducing PutCassandraRecord processor




---