Repository: nifi Updated Branches: refs/heads/master 7e627f2fe -> c56a7e9ba
NIFI-5510: Introducing PutCassandraRecord processor NIFI-5510: Fixes for PR review comments This closes #2992 Signed-off-by: Mike Thomsen <mikerthom...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c56a7e9b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c56a7e9b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c56a7e9b Branch: refs/heads/master Commit: c56a7e9ba5808ead129d462c369e73c710dd3145 Parents: 7e627f2 Author: zenfenan <sivaprasanna...@gmail.com> Authored: Fri Aug 17 15:26:47 2018 +0530 Committer: Mike Thomsen <mikerthom...@gmail.com> Committed: Tue Sep 18 08:10:14 2018 -0400 ---------------------------------------------------------------------- .../nifi-cassandra-processors/pom.xml | 15 ++ .../cassandra/AbstractCassandraProcessor.java | 18 +- .../processors/cassandra/PutCassandraQL.java | 14 -- .../cassandra/PutCassandraRecord.java | 222 +++++++++++++++++++ .../processors/cassandra/QueryCassandra.java | 14 -- .../org.apache.nifi.processor.Processor | 33 +-- .../cassandra/PutCassandraRecordIT.java | 123 ++++++++++ .../cassandra/PutCassandraRecordTest.java | 198 +++++++++++++++++ 8 files changed, 591 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml index 0338d21..d5c36d4 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml @@ -64,5 +64,20 @@ <artifactId>nifi-ssl-context-service</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock-record-utils</artifactId> + <version>1.8.0-SNAPSHOT</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java index b0f82f9..d1e3874 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java @@ -37,6 +37,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.security.util.SslContextFactory; import org.apache.nifi.ssl.SSLContextService; @@ -72,8 +73,8 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { public static final PropertyDescriptor KEYSPACE = new PropertyDescriptor.Builder() .name("Keyspace") - .description("The Cassandra Keyspace to connect to. If no keyspace is specified, the query will need to " - + "include the keyspace name before any table reference.") + .description("The Cassandra Keyspace to connect to. If not set, the keyspace name has to be provided with the " + + "table name in the form of <KEYSPACE>.<TABLE>") .required(false) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -131,6 +132,19 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is transferred to this relationship if the operation completed successfully.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is transferred to this relationship if the operation failed.") + .build(); + public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") + .description("A FlowFile is transferred to this relationship if the operation cannot be completed but attempting " + + "it again may succeed.") + .build(); + static List<PropertyDescriptor> descriptors = new ArrayList<>(); static { http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java index d6f13de..1b14874 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java @@ -119,20 +119,6 @@ public class PutCassandraQL extends AbstractCassandraProcessor { private final static List<PropertyDescriptor> propertyDescriptors; - // Relationships - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Successfully executed CQL statement.") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("CQL statement execution failed.") - .build(); - public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") - .description("A FlowFile is transferred to this relationship if the statement cannot be executed successfully but " - + "attempting the operation again may succeed.") - .build(); - private final static Set<Relationship> relationships; private static final Pattern CQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("cql\\.args\\.(\\d+)\\.type"); http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java new file mode 100644 index 0000000..402ec3d --- /dev/null +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; + +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("This is a record aware processor that reads the content of the incoming FlowFile as individual records using the " + + "configured 'Record Reader' and writes them to Apache Cassandra using native protocol version 3 or higher.") +public class PutCassandraRecord extends AbstractCassandraProcessor { + + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("put-cassandra-record-reader") + .displayName("Record Reader") + .description("Specifies the type of Record Reader controller service to use for parsing the incoming data " + + "and determining the schema") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder() + .name("put-cassandra-record-table") + .displayName("Table name") + .description("The name of the Cassandra table to which the records have to be written.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("put-cassandra-record-batch-size") + .displayName("Batch size") + .description("Specifies the number of 'Insert statements' to be grouped together to execute as a batch (BatchStatement)") + .defaultValue("100") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(true) + .build(); + + static final PropertyDescriptor BATCH_STATEMENT_TYPE = new PropertyDescriptor.Builder() + .name("put-cassandra-record-batch-statement-type") + .displayName("Batch Statement Type") + .description("Specifies the type of 'Batch Statement' to be used.") + .allowableValues(BatchStatement.Type.values()) + .defaultValue(BatchStatement.Type.LOGGED.toString()) + .required(false) + .build(); + + static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractCassandraProcessor.CONSISTENCY_LEVEL) + .allowableValues(ConsistencyLevel.SERIAL.name(), ConsistencyLevel.LOCAL_SERIAL.name()) + .defaultValue(ConsistencyLevel.SERIAL.name()) + .build(); + + private final static List<PropertyDescriptor> propertyDescriptors = Collections.unmodifiableList(Arrays.asList( + CONTACT_POINTS, KEYSPACE, TABLE, CLIENT_AUTH, USERNAME, PASSWORD, RECORD_READER_FACTORY, + BATCH_SIZE, CONSISTENCY_LEVEL, BATCH_STATEMENT_TYPE, PROP_SSL_CONTEXT_SERVICE)); + + private final static Set<Relationship> relationships = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + try { + connectToCassandra(context); + } catch (NoHostAvailableException nhae) { + getLogger().error("No host in the Cassandra cluster can be contacted successfully to execute this statement", nhae); + getLogger().error(nhae.getCustomMessage(10, true, false)); + throw new ProcessException(nhae); + } catch (AuthenticationException ae) { + getLogger().error("Invalid username/password combination", ae); + throw new ProcessException(ae); + } + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile inputFlowFile = session.get(); + + if (inputFlowFile == null) { + return; + } + + final String cassandraTable = context.getProperty(TABLE).evaluateAttributeExpressions(inputFlowFile).getValue(); + final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class); + final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); + final String batchStatementType = context.getProperty(BATCH_STATEMENT_TYPE).getValue(); + final String serialConsistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue(); + + final BatchStatement batchStatement; + final Session connectionSession = cassandraSession.get(); + final AtomicInteger recordsAdded = new AtomicInteger(0); + final StopWatch stopWatch = new StopWatch(true); + + boolean error = false; + + try (final InputStream inputStream = session.read(inputFlowFile); + final RecordReader reader = recordParserFactory.createRecordReader(inputFlowFile, inputStream, getLogger())){ + + final RecordSchema schema = reader.getSchema(); + Record record; + + batchStatement = new BatchStatement(BatchStatement.Type.valueOf(batchStatementType)); + batchStatement.setSerialConsistencyLevel(ConsistencyLevel.valueOf(serialConsistencyLevel)); + + while((record = reader.nextRecord()) != null) { + Map<String, Object> recordContentMap = (Map<String, Object>) DataTypeUtils + .convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema())); + Insert insertQuery; + + if (cassandraTable.contains(".")) { + String keyspaceAndTable[] = cassandraTable.split("\\."); + insertQuery = QueryBuilder.insertInto(keyspaceAndTable[0], keyspaceAndTable[1]); + } else { + insertQuery = QueryBuilder.insertInto(cassandraTable); + } + for (String fieldName : schema.getFieldNames()) { + insertQuery.value(fieldName, recordContentMap.get(fieldName)); + } + batchStatement.add(insertQuery); + + if (recordsAdded.incrementAndGet() == batchSize) { + connectionSession.execute(batchStatement); + batchStatement.clear(); + } + } + + if (batchStatement.size() != 0) { + connectionSession.execute(batchStatement); + batchStatement.clear(); + } + + } catch (Exception e) { + error = true; + getLogger().error("Unable to write the records into Cassandra table due to {}", new Object[] {e}); + session.transfer(inputFlowFile, REL_FAILURE); + } finally { + if (!error) { + stopWatch.stop(); + long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS); + String transitUri = "cassandra://" + connectionSession.getCluster().getMetadata().getClusterName() + "." + cassandraTable; + + session.getProvenanceReporter().send(inputFlowFile, transitUri, "Inserted " + recordsAdded.get() + " records", duration); + session.transfer(inputFlowFile, REL_SUCCESS); + } + } + + } + + @OnUnscheduled + public void stop() { + super.stop(); + } + + @OnShutdown + public void shutdown() { + super.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java index 384c395..40b88cc 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java @@ -132,20 +132,6 @@ public class QueryCassandra extends AbstractCassandraProcessor { private final static List<PropertyDescriptor> propertyDescriptors; - // Relationships - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Successfully created FlowFile from CQL query result set.") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("CQL query execution failed.") - .build(); - public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") - .description("A FlowFile is transferred to this relationship if the query cannot be completed but attempting " - + "the operation again may succeed.") - .build(); - private final static Set<Relationship> relationships; /* http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 34cf4ef..88aa0d8 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -1,16 +1,17 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.nifi.processors.cassandra.QueryCassandra -org.apache.nifi.processors.cassandra.PutCassandraQL \ No newline at end of file +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.cassandra.QueryCassandra +org.apache.nifi.processors.cassandra.PutCassandraQL +org.apache.nifi.processors.cassandra.PutCassandraRecord \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java new file mode 100644 index 0000000..07b9ac6 --- /dev/null +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Select; +import com.datastax.driver.core.querybuilder.Truncate; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.List; +import java.util.stream.Collectors; + +public class PutCassandraRecordIT { + + private static TestRunner testRunner; + private static MockRecordParser recordReader; + + private static Cluster cluster; + private static Session session; + + private static final String KEYSPACE = "sample_keyspace"; + private static final String TABLE = "sample_table"; + private static final String HOST = "localhost"; + private static final int PORT = 9042; + + @BeforeClass + public static void setup() throws InitializationException { + recordReader = new MockRecordParser(); + testRunner = TestRunners.newTestRunner(PutCassandraRecord.class); + + testRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY, "reader"); + testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, HOST + ":" + PORT); + testRunner.setProperty(PutCassandraRecord.KEYSPACE, KEYSPACE); + testRunner.setProperty(PutCassandraRecord.TABLE, TABLE); + testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL"); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED"); + testRunner.addControllerService("reader", recordReader); + testRunner.enableControllerService(recordReader); + + cluster = Cluster.builder().addContactPoint(HOST).withPort(PORT).build(); + session = cluster.connect(); + + String createKeyspace = "CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class':'SimpleStrategy','replication_factor':1};"; + String createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE + "(id int PRIMARY KEY, name text, age int);"; + + session.execute(createKeyspace); + session.execute(createTable); + } + + @Test + public void testSimplePut() { + recordReader.addSchemaField("id", RecordFieldType.INT); + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + + recordReader.addRecord(1, "Ram", 42); + recordReader.addRecord(2, "Jeane", 47); + recordReader.addRecord(3, "Ilamaran", 27); + recordReader.addRecord(4, "Jian", 14); + recordReader.addRecord(5, "Sakura", 24); + + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1); + Assert.assertEquals(5, getRecordsCount()); + } + + private int getRecordsCount() { + Select selectQuery = QueryBuilder.select().all().from(KEYSPACE, TABLE); + ResultSet result = session.execute(selectQuery); + + List<Integer> resultsList = result.all() + .stream() + .map(r -> r.getInt(0)) + .collect(Collectors.toList()); + + dropRecords(); + return resultsList.size(); + } + + private void dropRecords() { + Truncate query = QueryBuilder.truncate(KEYSPACE, TABLE); + session.execute(query); + } + + @AfterClass + public static void shutdown() { + String dropKeyspace = "DROP KEYSPACE " + KEYSPACE; + String dropTable = "DROP TABLE IF EXISTS " + KEYSPACE + "." + TABLE; + + session.execute(dropTable); + session.execute(dropKeyspace); + + session.close(); + cluster.close(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java new file mode 100644 index 0000000..88a9b5d --- /dev/null +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Configuration; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class PutCassandraRecordTest { + + private TestRunner testRunner; + private MockRecordParser recordReader; + + @Before + public void setUp() throws Exception { + MockPutCassandraRecord processor = new MockPutCassandraRecord(); + recordReader = new MockRecordParser(); + testRunner = TestRunners.newTestRunner(processor); + testRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY, "reader"); + } + + @Test + public void testProcessorConfigValidity() throws InitializationException { + testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, "localhost:9042"); + testRunner.assertNotValid(); + + testRunner.setProperty(PutCassandraRecord.PASSWORD, "password"); + testRunner.assertNotValid(); + + testRunner.setProperty(PutCassandraRecord.USERNAME, "username"); + testRunner.assertNotValid(); + + testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL"); + testRunner.assertNotValid(); + + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED"); + testRunner.assertNotValid(); + + testRunner.setProperty(PutCassandraRecord.KEYSPACE, "sampleks"); + testRunner.assertNotValid(); + + testRunner.setProperty(PutCassandraRecord.TABLE, "sampletbl"); + testRunner.assertNotValid(); + + testRunner.addControllerService("reader", recordReader); + testRunner.enableControllerService(recordReader); + testRunner.assertValid(); + } + + private void setUpStandardTestConfig() throws InitializationException { + testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "localhost:9042"); + testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "password"); + testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "username"); + testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL"); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED"); + testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl"); + testRunner.addControllerService("reader", recordReader); + testRunner.enableControllerService(recordReader); + } + + @Test + public void testSimplePut() throws InitializationException { + setUpStandardTestConfig(); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("sport", RecordFieldType.STRING); + + recordReader.addRecord("John Doe", 48, "Soccer"); + recordReader.addRecord("Jane Doe", 47, "Tennis"); + recordReader.addRecord("Sally Doe", 47, "Curling"); + recordReader.addRecord("Jimmy Doe", 14, null); + recordReader.addRecord("Pizza Doe", 14, null); + + testRunner.enqueue(""); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1); + } + + @Test + public void testEL() throws InitializationException { + testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, "${contact.points}"); + testRunner.setProperty(PutCassandraRecord.PASSWORD, "${pass}"); + testRunner.setProperty(PutCassandraRecord.USERNAME, "${user}"); + testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL"); + testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED"); + testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl"); + testRunner.addControllerService("reader", recordReader); + testRunner.enableControllerService(recordReader); + + testRunner.assertValid(); + + testRunner.setVariable("contact.points", "localhost:9042"); + testRunner.setVariable("user", "username"); + testRunner.setVariable("pass", "password"); + + recordReader.addSchemaField("name", RecordFieldType.STRING); + recordReader.addSchemaField("age", RecordFieldType.INT); + recordReader.addSchemaField("sport", RecordFieldType.STRING); + + recordReader.addRecord("John Doe", 48, "Soccer"); + recordReader.addRecord("Jane Doe", 47, "Tennis"); + recordReader.addRecord("Sally Doe", 47, "Curling"); + recordReader.addRecord("Jimmy Doe", 14, null); + recordReader.addRecord("Pizza Doe", 14, null); + + testRunner.enqueue(""); + testRunner.run(1, true, true); + testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1); + } + + private static class MockPutCassandraRecord extends PutCassandraRecord { + private Exception exceptionToThrow = null; + private Session mockSession = mock(Session.class); + + @Override + protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext, + String username, String password) { + Cluster mockCluster = mock(Cluster.class); + try { + Metadata mockMetadata = mock(Metadata.class); + when(mockMetadata.getClusterName()).thenReturn("cluster1"); + when(mockCluster.getMetadata()).thenReturn(mockMetadata); + when(mockCluster.connect()).thenReturn(mockSession); + when(mockCluster.connect(anyString())).thenReturn(mockSession); + Configuration config = Configuration.builder().build(); + when(mockCluster.getConfiguration()).thenReturn(config); + ResultSetFuture future = mock(ResultSetFuture.class); + ResultSet rs = CassandraQueryTestUtil.createMockResultSet(); + PreparedStatement ps = mock(PreparedStatement.class); + when(mockSession.prepare(anyString())).thenReturn(ps); + BoundStatement bs = mock(BoundStatement.class); + when(ps.bind()).thenReturn(bs); + when(future.getUninterruptibly()).thenReturn(rs); + try { + doReturn(rs).when(future).getUninterruptibly(anyLong(), any(TimeUnit.class)); + } catch (TimeoutException te) { + throw new IllegalArgumentException("Mocked cluster doesn't time out"); + } + if (exceptionToThrow != null) { + doThrow(exceptionToThrow).when(mockSession).executeAsync(anyString()); + doThrow(exceptionToThrow).when(mockSession).executeAsync(any(Statement.class)); + + } else { + when(mockSession.executeAsync(anyString())).thenReturn(future); + when(mockSession.executeAsync(any(Statement.class))).thenReturn(future); + } + when(mockSession.getCluster()).thenReturn(mockCluster); + } catch (Exception e) { + fail(e.getMessage()); + } + return mockCluster; + } + } +}