Repository: storm Updated Branches: refs/heads/master caeb89242 -> 7e02b71ea
STORM-2301 [storm-cassandra] upgrade cassandra driver to 3.1.2 * Also modify the codebase since driver itself is not backward compatible with 2.1 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a1b7b568 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a1b7b568 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a1b7b568 Branch: refs/heads/master Commit: a1b7b568ad02bbd599f908c3ab500c85aa498197 Parents: bb533a9 Author: Jungtaek Lim <kabh...@gmail.com> Authored: Wed Jan 18 16:42:04 2017 +0900 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Wed Jan 18 17:50:40 2017 +0900 ---------------------------------------------------------------------- external/storm-cassandra/pom.xml | 2 +- .../query/impl/PreparedStatementBinder.java | 68 ++------------------ .../query/impl/RoutingKeyGenerator.java | 7 +- 3 files changed, 9 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a1b7b568/external/storm-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml index e0f41d8..0eff9f6 100644 --- a/external/storm-cassandra/pom.xml +++ b/external/storm-cassandra/pom.xml @@ -37,7 +37,7 @@ <junit.version>4.11</junit.version> <guava.version>16.0.1</guava.version> <commons-lang3.version>3.3</commons-lang3.version> - <cassandra.driver.core.version>2.1.7.1</cassandra.driver.core.version> + <cassandra.driver.core.version>3.1.2</cassandra.driver.core.version> </properties> <developers> http://git-wip-us.apache.org/repos/asf/storm/blob/a1b7b568/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java index 4606b05..677ef56 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/PreparedStatementBinder.java @@ -19,8 +19,10 @@ package org.apache.storm.cassandra.query.impl; import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.CodecRegistry; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.TupleValue; +import com.datastax.driver.core.TypeTokens; import com.datastax.driver.core.UDTValue; import org.apache.storm.cassandra.query.Column; @@ -71,73 +73,11 @@ public interface PreparedStatementBinder extends Serializable { if(col.isNull()) { boundStatement.setToNull(col.getColumnName()); } else { - bind(boundStatement, col.getColumnName(), col.getVal()); + boundStatement.set(col.getColumnName(), col.getVal(), + CodecRegistry.DEFAULT_INSTANCE.codecFor(col.getVal())); } } return statement.bind(values); } - - /** - * This ugly method comes from {@link com.datastax.driver.core.TypeCodec#getDataTypeFor(Object)}. - */ - static void bind(BoundStatement statement, String name, Object value) { - // Starts with ByteBuffer, so that if already serialized value are provided, we don't have the - // cost of testing a bunch of other types first - if (value instanceof ByteBuffer) - statement.setBytes(name, (ByteBuffer)value); - - if (value instanceof Number) { - if (value instanceof Integer) - statement.setInt(name, (Integer)value); - if (value instanceof Long) - statement.setLong(name, (Long) value); - if (value instanceof Float) - statement.setFloat(name, (Float) value); - if (value instanceof Double) - statement.setDouble(name, (Double)value); - if (value instanceof BigDecimal) - statement.setDecimal(name, (BigDecimal)value); - if (value instanceof BigInteger) - statement.setVarint(name, (BigInteger)value); - throw new IllegalArgumentException(String.format("Value of type %s does not correspond to any CQL3 type", value.getClass())); - } - - if (value instanceof String) - statement.setString(name, (String)value); - - if (value instanceof Boolean) - statement.setBool(name, (Boolean)value); - - if (value instanceof InetAddress) - statement.setInet(name, (InetAddress)value); - - if (value instanceof Date) - statement.setDate(name, (Date)value); - - if (value instanceof UUID) - statement.setUUID(name, (UUID)value); - - if (value instanceof List) { - statement.setList(name, (List)value); - } - - if (value instanceof Set) { - statement.setSet(name, (Set)value); - } - - if (value instanceof Map) { - statement.setMap(name, (Map) value); - } - - if (value instanceof UDTValue) { - statement.setUDTValue(name, (UDTValue)value); - } - - if (value instanceof TupleValue) { - statement.setTupleValue(name, (TupleValue) value); - } - - throw new IllegalArgumentException(String.format("Value of type %s does not correspond to any CQL3 type", value.getClass())); - } } } http://git-wip-us.apache.org/repos/asf/storm/blob/a1b7b568/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java index 57a6689..9c84506 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/RoutingKeyGenerator.java @@ -18,10 +18,10 @@ */ package org.apache.storm.cassandra.query.impl; -import org.apache.storm.tuple.ITuple; -import com.datastax.driver.core.DataType; +import com.datastax.driver.core.CodecRegistry; import com.datastax.driver.core.ProtocolVersion; import com.google.common.base.Preconditions; +import org.apache.storm.tuple.ITuple; import java.io.Serializable; import java.nio.ByteBuffer; @@ -45,7 +45,8 @@ public class RoutingKeyGenerator implements Serializable { List<ByteBuffer> keys = new ArrayList<>(routingKeys.size()); for(String s : routingKeys) { Object value = tuple.getValueByField(s); - keys.add(DataType.serializeValue(value, ProtocolVersion.NEWEST_SUPPORTED)); + ByteBuffer serialized = CodecRegistry.DEFAULT_INSTANCE.codecFor(value).serialize(value, ProtocolVersion.NEWEST_SUPPORTED); + keys.add(serialized); } return keys; }