Repository: nifi
Updated Branches:
  refs/heads/master 75bb4bfaa -> 6cbc58543


NIFI-1998: Upgraded Cassandra driver to 3.0.2

This closes #521.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6cbc5854
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6cbc5854
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6cbc5854

Branch: refs/heads/master
Commit: 6cbc585438572e440c07722dc8c13d6be63e9966
Parents: 75bb4bf
Author: Matt Burgess <mattyb...@apache.org>
Authored: Fri Jun 10 15:25:09 2016 -0400
Committer: Pierre Villard <pierre.villard...@gmail.com>
Committed: Mon Jun 13 23:28:14 2016 +0200

----------------------------------------------------------------------
 .../nifi-cassandra-processors/pom.xml           |  2 +-
 .../cassandra/AbstractCassandraProcessor.java   | 21 +++++++++----
 .../processors/cassandra/PutCassandraQL.java    | 32 ++++++++++++--------
 .../AbstractCassandraProcessorTest.java         |  2 +-
 .../cassandra/CassandraQueryTestUtil.java       |  9 ++++--
 .../cassandra/PutCassandraQLTest.java           |  2 +-
 .../cassandra/QueryCassandraTest.java           |  2 +-
 7 files changed, 44 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
index 68e1eeb..a341fa0 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
@@ -41,7 +41,7 @@
         <dependency>
             <groupId>com.datastax.cassandra</groupId>
             <artifactId>cassandra-driver-core</artifactId>
-            <version>2.1.9</version>
+            <version>3.0.2</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
index 3f93b81..d703ee8 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
@@ -17,12 +17,14 @@
 package org.apache.nifi.processors.cassandra;
 
 import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.CodecRegistry;
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.JdkSSLOptions;
 import com.datastax.driver.core.Metadata;
 import com.datastax.driver.core.Row;
-import com.datastax.driver.core.SSLOptions;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TypeCodec;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.commons.lang3.StringUtils;
@@ -167,6 +169,8 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
     protected final AtomicReference<Cluster> cluster = new 
AtomicReference<>(null);
     protected final AtomicReference<Session> cassandraSession = new 
AtomicReference<>(null);
 
+    protected static final CodecRegistry codecRegistry = new CodecRegistry();
+
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
         Set<ValidationResult> results = new HashSet<>();
@@ -253,7 +257,10 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
                                     String username, String password) {
         Cluster.Builder builder = 
Cluster.builder().addContactPointsWithPorts(contactPoints);
         if (sslContext != null) {
-            builder = builder.withSSL(new SSLOptions(sslContext, 
SSLOptions.DEFAULT_SSL_CIPHER_SUITES));
+            JdkSSLOptions sslOptions = JdkSSLOptions.builder()
+                    .withSSLContext(sslContext)
+                    .build();
+            builder = builder.withSSL(sslOptions);
         }
         if (username != null && password != null) {
             builder = builder.withCredentials(username, password);
@@ -315,15 +322,17 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
             }
             // Get the first type argument, to be used for lists and sets (and 
the first in a map)
             DataType firstArg = typeArguments.get(0);
+            TypeCodec firstCodec = codecRegistry.codecFor(firstArg);
             if (dataType.equals(DataType.set(firstArg))) {
-                return row.getSet(i, firstArg.asJavaClass());
+                return row.getSet(i, firstCodec.getJavaType());
             } else if (dataType.equals(DataType.list(firstArg))) {
-                return row.getList(i, firstArg.asJavaClass());
+                return row.getList(i, firstCodec.getJavaType());
             } else {
                 // Must be an n-arg collection like map
                 DataType secondArg = typeArguments.get(1);
+                TypeCodec secondCodec = codecRegistry.codecFor(secondArg);
                 if (dataType.equals(DataType.map(firstArg, secondArg))) {
-                    return row.getMap(i, firstArg.asJavaClass(), 
secondArg.asJavaClass());
+                    return row.getMap(i, firstCodec.getJavaType(), 
secondCodec.getJavaType());
                 }
             }
 
@@ -427,7 +436,7 @@ public abstract class AbstractCassandraProcessor extends 
AbstractProcessor {
                 return primitiveType;
             }
         }
-        throw new IllegalArgumentException("Not a primitive Cassandra type: " 
+ dataTypeName);
+        return null;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
index 8c0908a..4af3aa7 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
@@ -21,6 +21,7 @@ import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TypeCodec;
 import com.datastax.driver.core.exceptions.AuthenticationException;
 import com.datastax.driver.core.exceptions.InvalidTypeException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
@@ -119,7 +120,6 @@ public class PutCassandraQL extends 
AbstractCassandraProcessor {
     // Matches on top-level type (primitive types like text,int) and also for 
collections (like list<boolean> and map<float,double>)
     private static final Pattern CQL_TYPE_PATTERN = 
Pattern.compile("([^<]+)(<([^,>]+)(,([^,>]+))*>)?");
 
-
     /*
      * Will ensure that the list of property descriptors is build only once.
      * Will also create a Set of relationships
@@ -310,9 +310,9 @@ public class PutCassandraQL extends 
AbstractCassandraProcessor {
             // If the matcher doesn't match, this should fall through to the 
exception at the bottom
             if (matcher.find() && matcher.groupCount() > 1) {
                 String mainTypeString = matcher.group(1).toLowerCase();
-                DataType.Name mainTypeName = 
DataType.Name.valueOf(mainTypeString.toUpperCase());
-                if (!mainTypeName.isCollection()) {
-                    DataType mainType = 
getPrimitiveDataTypeFromString(mainTypeString);
+                DataType mainType = 
getPrimitiveDataTypeFromString(mainTypeString);
+                if (mainType != null) {
+                    TypeCodec typeCodec = codecRegistry.codecFor(mainType);
 
                     // Need the right statement.setXYZ() method
                     if (mainType.equals(DataType.ascii())
@@ -327,23 +327,23 @@ public class PutCassandraQL extends 
AbstractCassandraProcessor {
                         statement.setString(paramIndex, paramValue);
 
                     } else if (mainType.equals(DataType.cboolean())) {
-                        statement.setBool(paramIndex, (boolean) 
mainType.parse(paramValue));
+                        statement.setBool(paramIndex, (boolean) 
typeCodec.parse(paramValue));
 
                     } else if (mainType.equals(DataType.cint())) {
-                        statement.setInt(paramIndex, (int) 
mainType.parse(paramValue));
+                        statement.setInt(paramIndex, (int) 
typeCodec.parse(paramValue));
 
                     } else if (mainType.equals(DataType.bigint())
                             || mainType.equals(DataType.counter())) {
-                        statement.setLong(paramIndex, (long) 
mainType.parse(paramValue));
+                        statement.setLong(paramIndex, (long) 
typeCodec.parse(paramValue));
 
                     } else if (mainType.equals(DataType.cfloat())) {
-                        statement.setFloat(paramIndex, (float) 
mainType.parse(paramValue));
+                        statement.setFloat(paramIndex, (float) 
typeCodec.parse(paramValue));
 
                     } else if (mainType.equals(DataType.cdouble())) {
-                        statement.setDouble(paramIndex, (double) 
mainType.parse(paramValue));
+                        statement.setDouble(paramIndex, (double) 
typeCodec.parse(paramValue));
 
                     } else if (mainType.equals(DataType.blob())) {
-                        statement.setBytes(paramIndex, (ByteBuffer) 
mainType.parse(paramValue));
+                        statement.setBytes(paramIndex, (ByteBuffer) 
typeCodec.parse(paramValue));
 
                     }
                     return;
@@ -352,22 +352,28 @@ public class PutCassandraQL extends 
AbstractCassandraProcessor {
                     if (matcher.groupCount() > 2) {
                         String firstParamTypeName = matcher.group(3);
                         DataType firstParamType = 
getPrimitiveDataTypeFromString(firstParamTypeName);
+                        if (firstParamType == null) {
+                            throw new IllegalArgumentException("Nested 
collections are not supported");
+                        }
 
                         // Check for map type
                         if 
(DataType.Name.MAP.toString().equalsIgnoreCase(mainTypeString)) {
                             if (matcher.groupCount() > 4) {
                                 String secondParamTypeName = matcher.group(5);
                                 DataType secondParamType = 
getPrimitiveDataTypeFromString(secondParamTypeName);
-                                statement.setMap(paramIndex, (Map) 
DataType.map(firstParamType, secondParamType).parse(paramValue));
+                                DataType mapType = 
DataType.map(firstParamType, secondParamType);
+                                statement.setMap(paramIndex, (Map) 
codecRegistry.codecFor(mapType).parse(paramValue));
                                 return;
                             }
                         } else {
                             // Must be set or list
                             if 
(DataType.Name.SET.toString().equalsIgnoreCase(mainTypeString)) {
-                                statement.setSet(paramIndex, (Set) 
DataType.set(firstParamType).parse(paramValue));
+                                DataType setType = 
DataType.set(firstParamType);
+                                statement.setSet(paramIndex, (Set) 
codecRegistry.codecFor(setType).parse(paramValue));
                                 return;
                             } else if 
(DataType.Name.LIST.toString().equalsIgnoreCase(mainTypeString)) {
-                                statement.setList(paramIndex, (List) 
DataType.list(firstParamType).parse(paramValue));
+                                DataType listType = 
DataType.list(firstParamType);
+                                statement.setList(paramIndex, (List) 
codecRegistry.codecFor(listType).parse(paramValue));
                                 return;
                             }
                         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
index 19e2320..3b0e273 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
@@ -270,7 +270,7 @@ public class AbstractCassandraProcessorTest {
             Metadata mockMetadata = mock(Metadata.class);
             when(mockMetadata.getClusterName()).thenReturn("cluster1");
             when(mockCluster.getMetadata()).thenReturn(mockMetadata);
-            Configuration config = new Configuration();
+            Configuration config = Configuration.builder().build();
             when(mockCluster.getConfiguration()).thenReturn(config);
             return mockCluster;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
index 17f2c27..0d5571e 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/CassandraQueryTestUtil.java
@@ -21,6 +21,7 @@ import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -34,7 +35,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -107,9 +110,9 @@ public class CassandraQueryTestUtil {
         when(row.getString(0)).thenReturn(user_id);
         when(row.getString(1)).thenReturn(first_name);
         when(row.getString(2)).thenReturn(last_name);
-        when(row.getSet(3, String.class)).thenReturn(emails);
-        when(row.getList(4, String.class)).thenReturn(top_places);
-        when(row.getMap(5, Date.class, String.class)).thenReturn(todo);
+        when(row.getSet(eq(3), any(TypeToken.class))).thenReturn(emails);
+        when(row.getList(eq(4), any(TypeToken.class))).thenReturn(top_places);
+        when(row.getMap(eq(5), any(TypeToken.class), 
any(TypeToken.class))).thenReturn(todo);
         when(row.getBool(6)).thenReturn(registered);
         when(row.getFloat(7)).thenReturn(scale);
         when(row.getDouble(8)).thenReturn(metric);

http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
index 17e52dd..1a40556 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
@@ -180,7 +180,7 @@ public class PutCassandraQLTest {
                 when(mockCluster.getMetadata()).thenReturn(mockMetadata);
                 when(mockCluster.connect()).thenReturn(mockSession);
                 when(mockCluster.connect(anyString())).thenReturn(mockSession);
-                Configuration config = new Configuration();
+                Configuration config = Configuration.builder().build();
                 when(mockCluster.getConfiguration()).thenReturn(config);
                 ResultSetFuture future = mock(ResultSetFuture.class);
                 ResultSet rs = CassandraQueryTestUtil.createMockResultSet();

http://git-wip-us.apache.org/repos/asf/nifi/blob/6cbc5854/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
index 444da4d..023f239 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/QueryCassandraTest.java
@@ -340,7 +340,7 @@ public class QueryCassandraTest {
                 Session mockSession = mock(Session.class);
                 when(mockCluster.connect()).thenReturn(mockSession);
                 when(mockCluster.connect(anyString())).thenReturn(mockSession);
-                Configuration config = new Configuration();
+                Configuration config = Configuration.builder().build();
                 when(mockCluster.getConfiguration()).thenReturn(config);
                 ResultSetFuture future = mock(ResultSetFuture.class);
                 ResultSet rs = CassandraQueryTestUtil.createMockResultSet();

Reply via email to