Repository: storm
Updated Branches:
  refs/heads/master caeb89242 -> 7e02b71ea


STORM-2301 [storm-cassandra] upgrade cassandra driver to 3.1.2

* Also modify the codebase since driver itself is not backward compatible with 
2.1


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1b7b568
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1b7b568
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1b7b568

Branch: refs/heads/master
Commit: a1b7b568ad02bbd599f908c3ab500c85aa498197
Parents: bb533a9
Author: Jungtaek Lim <kabh...@gmail.com>
Authored: Wed Jan 18 16:42:04 2017 +0900
Committer: Jungtaek Lim <kabh...@gmail.com>
Committed: Wed Jan 18 17:50:40 2017 +0900

----------------------------------------------------------------------
 external/storm-cassandra/pom.xml                |  2 +-
 .../query/impl/PreparedStatementBinder.java     | 68 ++------------------
 .../query/impl/RoutingKeyGenerator.java         |  7 +-
 3 files changed, 9 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a1b7b568/external/storm-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml
index e0f41d8..0eff9f6 100644
--- a/external/storm-cassandra/pom.xml
+++ b/external/storm-cassandra/pom.xml
@@ -37,7 +37,7 @@
         <junit.version>4.11</junit.version>
         <guava.version>16.0.1</guava.version>
         <commons-lang3.version>3.3</commons-lang3.version>
-        <cassandra.driver.core.version>2.1.7.1</cassandra.driver.core.version>
+        <cassandra.driver.core.version>3.1.2</cassandra.driver.core.version>
     </properties>
 
     <developers>

http://git-wip-us.apache.org/repos/asf/storm/blob/a1b7b568/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
index 4606b05..677ef56 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java
@@ -19,8 +19,10 @@
 package org.apache.storm.cassandra.query.impl;
 
 import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.CodecRegistry;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.TupleValue;
+import com.datastax.driver.core.TypeTokens;
 import com.datastax.driver.core.UDTValue;
 import org.apache.storm.cassandra.query.Column;
 
@@ -71,73 +73,11 @@ public interface PreparedStatementBinder extends 
Serializable {
                 if(col.isNull()) {
                     boundStatement.setToNull(col.getColumnName());
                 } else {
-                    bind(boundStatement, col.getColumnName(), col.getVal());
+                    boundStatement.set(col.getColumnName(), col.getVal(),
+                            
CodecRegistry.DEFAULT_INSTANCE.codecFor(col.getVal()));
                 }
             }
             return statement.bind(values);
         }
-
-        /**
-         * This ugly method comes from {@link 
com.datastax.driver.core.TypeCodec#getDataTypeFor(Object)}.
-         */
-        static void bind(BoundStatement statement, String name, Object value) {
-            // Starts with ByteBuffer, so that if already serialized value are 
provided, we don't have the
-            // cost of testing a bunch of other types first
-            if (value instanceof ByteBuffer)
-                statement.setBytes(name, (ByteBuffer)value);
-
-            if (value instanceof Number) {
-                if (value instanceof Integer)
-                    statement.setInt(name, (Integer)value);
-                if (value instanceof Long)
-                    statement.setLong(name, (Long) value);
-                if (value instanceof Float)
-                    statement.setFloat(name, (Float) value);
-                if (value instanceof Double)
-                    statement.setDouble(name, (Double)value);
-                if (value instanceof BigDecimal)
-                    statement.setDecimal(name, (BigDecimal)value);
-                if (value instanceof BigInteger)
-                    statement.setVarint(name, (BigInteger)value);
-                throw new IllegalArgumentException(String.format("Value of 
type %s does not correspond to any CQL3 type", value.getClass()));
-            }
-
-            if (value instanceof String)
-                statement.setString(name, (String)value);
-
-            if (value instanceof Boolean)
-                statement.setBool(name, (Boolean)value);
-
-            if (value instanceof InetAddress)
-                statement.setInet(name, (InetAddress)value);
-
-            if (value instanceof Date)
-                statement.setDate(name, (Date)value);
-
-            if (value instanceof UUID)
-                statement.setUUID(name, (UUID)value);
-
-            if (value instanceof List) {
-                statement.setList(name, (List)value);
-            }
-
-            if (value instanceof Set) {
-                statement.setSet(name, (Set)value);
-            }
-
-            if (value instanceof Map) {
-                statement.setMap(name, (Map) value);
-            }
-
-            if (value instanceof UDTValue) {
-                statement.setUDTValue(name, (UDTValue)value);
-            }
-
-            if (value instanceof TupleValue) {
-                statement.setTupleValue(name, (TupleValue) value);
-            }
-
-            throw new IllegalArgumentException(String.format("Value of type %s 
does not correspond to any CQL3 type", value.getClass()));
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a1b7b568/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
----------------------------------------------------------------------
diff --git 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
index 57a6689..9c84506 100644
--- 
a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
+++ 
b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java
@@ -18,10 +18,10 @@
  */
 package org.apache.storm.cassandra.query.impl;
 
-import org.apache.storm.tuple.ITuple;
-import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.CodecRegistry;
 import com.datastax.driver.core.ProtocolVersion;
 import com.google.common.base.Preconditions;
+import org.apache.storm.tuple.ITuple;
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
@@ -45,7 +45,8 @@ public class RoutingKeyGenerator implements Serializable {
         List<ByteBuffer> keys = new ArrayList<>(routingKeys.size());
         for(String s : routingKeys) {
             Object value = tuple.getValueByField(s);
-            keys.add(DataType.serializeValue(value, 
ProtocolVersion.NEWEST_SUPPORTED));
+            ByteBuffer serialized = 
CodecRegistry.DEFAULT_INSTANCE.codecFor(value).serialize(value, 
ProtocolVersion.NEWEST_SUPPORTED);
+            keys.add(serialized);
         }
         return keys;
     }

Reply via email to