Updated Branches: refs/heads/trunk d54a93f2d -> 8b00f3a25
Fix support of collections in prepared statements patch by slebresne; reviewed by jbellis for CASSANDRA-4739 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b00f3a2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b00f3a2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b00f3a2 Branch: refs/heads/trunk Commit: 8b00f3a258fcc04f0350d4f46760eacacbfed3df Parents: d54a93f Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Fri Oct 5 09:29:17 2012 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Oct 5 09:29:17 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/native_protocol.spec | 25 ++++++++- .../cassandra/cql3/operations/ColumnOperation.java | 20 ++++++++ .../cassandra/cql3/operations/ListOperation.java | 27 ++++++++++ .../cassandra/cql3/operations/MapOperation.java | 25 +++++++++ .../cassandra/cql3/operations/SetOperation.java | 28 ++++++++++ .../cassandra/cql3/statements/UpdateStatement.java | 35 +++++++++----- .../org/apache/cassandra/db/marshal/ListType.java | 28 +++++++--- .../org/apache/cassandra/db/marshal/MapType.java | 39 ++++++++++----- .../org/apache/cassandra/db/marshal/SetType.java | 28 +++++++--- 10 files changed, 210 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b00f3a2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8d25639..868183e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -19,6 +19,7 @@ * Add support for multiple column family outputs in CFOF (CASSANDRA-4208) * Support repairing only the local DC nodes (CASSANDRA-4747) * Use rpc_address for binary protocol and change default port (CASSANRA-4751) + * Fix use of collections in prepared statements (CASSANDRA-4739) 1.2-beta1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b00f3a2/doc/native_protocol.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec index 6666908..2e03a02 100644 --- a/doc/native_protocol.spec +++ b/doc/native_protocol.spec @@ -33,7 +33,8 @@ Table of Contents 4.2.5.4. Prepared 4.2.6. EVENT 5. Compression - 6. Error codes + 6. Collection types + 7. Error codes 1. Overview @@ -286,7 +287,7 @@ Table of Contents Indicates an error processing a request. The body of the message will be an error code ([int]) followed by a [string] error message. Then, depending on the exception, more content may follow. The error codes are defined in - Section 6, along with their additional content if any. + Section 7, along with their additional content if any. 4.2.2. READY @@ -452,7 +453,25 @@ Table of Contents flag (see Section 2.2) is set. -6. Error codes +6. Collection types + + This section describe the serialization format for the collection types: + list, map and set. This serialization format is both useful to decode values + returned in RESULT messages but also to encode values for EXECUTE ones. + + The serialization formats are: + List: a [short] n indicating the size of the list, followed by n elements. + Each element is [short bytes] representing the serialized element + value. + Map: a [short] n indicating the size of the map, followed by n entries. + Each entry is composed of two [short bytes] representing the key and + the value of the entry map. + Set: a [short] n indicating the size of the set, followed by n elements. + Each element is [short bytes] representing the serialized element + value. + + +7. Error codes The supported error codes are described below: 0x0000 Server error: something unexpected happened. This indicates a http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b00f3a2/src/java/org/apache/cassandra/cql3/operations/ColumnOperation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/operations/ColumnOperation.java b/src/java/org/apache/cassandra/cql3/operations/ColumnOperation.java index e7086c1..0f4c1fc 100644 --- a/src/java/org/apache/cassandra/cql3/operations/ColumnOperation.java +++ b/src/java/org/apache/cassandra/cql3/operations/ColumnOperation.java @@ -27,6 +27,9 @@ import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.db.marshal.ListType; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.db.marshal.SetType; import org.apache.cassandra.db.marshal.LongType; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.ByteBufferUtil; @@ -80,6 +83,23 @@ public class ColumnOperation implements Operation throw new InvalidRequestException("Column operations are only supported on simple types, but " + validator + " given."); } + public void executePreparedCollection(ColumnFamily cf, ColumnNameBuilder builder, CollectionType validator, UpdateParameters params) throws InvalidRequestException + { + + switch (validator.kind) + { + case LIST: + ListOperation.doInsertFromPrepared(cf, builder, (ListType)validator, value, params); + break; + case SET: + SetOperation.doInsertFromPrepared(cf, builder, (SetType)validator, value, params); + break; + case MAP: + MapOperation.doInsertFromPrepared(cf, builder, (MapType)validator, value, params); + break; + } + } + protected void doSet(ColumnFamily cf, ColumnNameBuilder builder, AbstractType<?> validator, UpdateParameters params) throws InvalidRequestException { ByteBuffer colName = builder.build(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b00f3a2/src/java/org/apache/cassandra/cql3/operations/ListOperation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/operations/ListOperation.java b/src/java/org/apache/cassandra/cql3/operations/ListOperation.java index 3de984b..b40cc18 100644 --- a/src/java/org/apache/cassandra/cql3/operations/ListOperation.java +++ b/src/java/org/apache/cassandra/cql3/operations/ListOperation.java @@ -31,6 +31,8 @@ import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.db.marshal.ListType; +import org.apache.cassandra.db.marshal.MarshalException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDGen; @@ -129,6 +131,31 @@ public class ListOperation implements Operation throw new InvalidRequestException("List operations are only supported on List typed columns, but " + validator + " given."); } + public static void doInsertFromPrepared(ColumnFamily cf, ColumnNameBuilder builder, ListType validator, Term values, UpdateParameters params) throws InvalidRequestException + { + if (!values.isBindMarker()) + throw new InvalidRequestException("Can't apply operation on column with " + validator + " type."); + + cf.addAtom(params.makeTombstoneForOverwrite(builder.copy().build(), builder.copy().buildAsEndOfRange())); + + try + { + List<?> l = validator.compose(params.variables.get(values.bindIndex)); + + for (int i = 0; i < l.size(); i++) + { + ColumnNameBuilder b = i == l.size() - 1 ? builder : builder.copy(); + ByteBuffer uuid = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()); + ByteBuffer name = b.add(uuid).build(); + cf.addColumn(params.makeColumn(name, validator.valueComparator().decompose(l.get(i)))); + } + } + catch (MarshalException e) + { + throw new InvalidRequestException(e.getMessage()); + } + } + private void doSet(ColumnFamily cf, ColumnNameBuilder builder, UpdateParameters params, CollectionType validator, List<Pair<ByteBuffer, IColumn>> list) throws InvalidRequestException { int idx = validateListIdx(values.get(0), list); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b00f3a2/src/java/org/apache/cassandra/cql3/operations/MapOperation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/operations/MapOperation.java b/src/java/org/apache/cassandra/cql3/operations/MapOperation.java index 56b1038..39a5413 100644 --- a/src/java/org/apache/cassandra/cql3/operations/MapOperation.java +++ b/src/java/org/apache/cassandra/cql3/operations/MapOperation.java @@ -30,6 +30,8 @@ import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.db.marshal.MarshalException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.Pair; @@ -84,6 +86,29 @@ public class MapOperation implements Operation } } + public static void doInsertFromPrepared(ColumnFamily cf, ColumnNameBuilder builder, MapType validator, Term values, UpdateParameters params) throws InvalidRequestException + { + if (!values.isBindMarker()) + throw new InvalidRequestException("Can't apply operation on column with " + validator + " type."); + + cf.addAtom(params.makeTombstoneForOverwrite(builder.copy().build(), builder.copy().buildAsEndOfRange())); + + try + { + Map<?, ?> m = validator.compose(params.variables.get(values.bindIndex)); + for (Map.Entry<?, ?> entry : m.entrySet()) + { + ByteBuffer name = builder.copy().add(validator.nameComparator().decompose(entry.getKey())).build(); + ByteBuffer value = validator.valueComparator().decompose(entry.getValue()); + cf.addColumn(params.makeColumn(name, value)); + } + } + catch (MarshalException e) + { + throw new InvalidRequestException(e.getMessage()); + } + } + private void doPut(ColumnFamily cf, ColumnNameBuilder builder, CollectionType validator, UpdateParameters params) throws InvalidRequestException { for (Map.Entry<Term, Term> entry : values.entrySet()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b00f3a2/src/java/org/apache/cassandra/cql3/operations/SetOperation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/operations/SetOperation.java b/src/java/org/apache/cassandra/cql3/operations/SetOperation.java index a31059c..36330f7 100644 --- a/src/java/org/apache/cassandra/cql3/operations/SetOperation.java +++ b/src/java/org/apache/cassandra/cql3/operations/SetOperation.java @@ -18,7 +18,9 @@ package org.apache.cassandra.cql3.operations; import java.nio.ByteBuffer; +import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.cassandra.cql3.ColumnNameBuilder; import org.apache.cassandra.cql3.Term; @@ -27,6 +29,8 @@ import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.db.marshal.MarshalException; +import org.apache.cassandra.db.marshal.SetType; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; @@ -73,6 +77,30 @@ public class SetOperation implements Operation throw new InvalidRequestException("Set operations are only supported on Set typed columns, but " + validator + " given."); } + public static void doInsertFromPrepared(ColumnFamily cf, ColumnNameBuilder builder, SetType validator, Term values, UpdateParameters params) throws InvalidRequestException + { + if (!values.isBindMarker()) + throw new InvalidRequestException("Can't apply operation on column with " + validator + " type."); + + cf.addAtom(params.makeTombstoneForOverwrite(builder.copy().build(), builder.copy().buildAsEndOfRange())); + + try + { + Set<?> s = validator.compose(params.variables.get(values.bindIndex)); + Iterator<?> iter = s.iterator(); + while (iter.hasNext()) + { + ColumnNameBuilder b = iter.hasNext() ? builder.copy() : builder; + ByteBuffer name = b.add(validator.nameComparator().decompose(iter.next())).build(); + cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER)); + } + } + catch (MarshalException e) + { + throw new InvalidRequestException(e.getMessage()); + } + } + private void doAdd(ColumnFamily cf, ColumnNameBuilder builder, CollectionType validator, UpdateParameters params) throws InvalidRequestException { for (int i = 0; i < values.size(); ++i) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b00f3a2/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index cb4261f..c3401b2 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -264,22 +264,33 @@ public class UpdateStatement extends ModificationStatement { Operation.Type type = valueOperation.getType(); - if (type == Operation.Type.COLUMN || type == Operation.Type.COUNTER) + switch (type) { - if (valueDef != null && valueDef.type.isCollection()) - throw new InvalidRequestException("Can't apply operation on column with " + valueDef.type + " type."); + case COLUMN: + if (valueDef != null && valueDef.type.isCollection()) + { + // This means this was a prepared statement where the whole collection was provided + // For have to deserialize it since it will be multiple columns + ((ColumnOperation)valueOperation).executePreparedCollection(cf, builder.copy(), (CollectionType)valueDef.type, params); + } + else + { + valueOperation.execute(cf, builder.copy(), valueDef == null ? null : valueDef.type, params); + } + break; + case COUNTER: + if (valueDef != null && valueDef.type.isCollection()) + throw new InvalidRequestException("Cannot assign collection value to column with " + valueDef.type + " type."); - AbstractType<?> validator = valueDef == null ? null : valueDef.type; - valueOperation.execute(cf, builder.copy(), validator, params); - } - else - { - if (!valueDef.type.isCollection()) - throw new InvalidRequestException("Can't apply collection operation on column with " + valueDef.type + " type."); + valueOperation.execute(cf, builder.copy(), valueDef == null ? null : valueDef.type, params); + break; + default: + if (!valueDef.type.isCollection()) + throw new InvalidRequestException("Can't apply collection operation on column with " + valueDef.type + " type."); - valueOperation.execute(cf, builder.copy(), (CollectionType) valueDef.type, params, list); + valueOperation.execute(cf, builder.copy(), (CollectionType) valueDef.type, params, list); + break; } - return valueOperation.getType() == Operation.Type.COUNTER; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b00f3a2/src/java/org/apache/cassandra/db/marshal/ListType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java index 7ee5e79..316bfd6 100644 --- a/src/java/org/apache/cassandra/db/marshal/ListType.java +++ b/src/java/org/apache/cassandra/db/marshal/ListType.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.db.marshal; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.*; @@ -72,17 +73,26 @@ public class ListType<T> extends CollectionType<List<T>> public List<T> compose(ByteBuffer bytes) { - ByteBuffer input = bytes.duplicate(); - int n = input.getShort(); - List<T> l = new ArrayList<T>(n); - for (int i = 0; i < n; i++) + try { - int s = input.getShort(); - byte[] data = new byte[s]; - input.get(data); - l.add(elements.compose(ByteBuffer.wrap(data))); + ByteBuffer input = bytes.duplicate(); + int n = input.getShort(); + List<T> l = new ArrayList<T>(n); + for (int i = 0; i < n; i++) + { + int s = input.getShort(); + byte[] data = new byte[s]; + input.get(data); + ByteBuffer databb = ByteBuffer.wrap(data); + elements.validate(databb); + l.add(elements.compose(databb)); + } + return l; + } + catch (BufferUnderflowException e) + { + throw new MarshalException("Not enough bytes to read a list"); } - return l; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b00f3a2/src/java/org/apache/cassandra/db/marshal/MapType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java index 8e0b161..faf153f 100644 --- a/src/java/org/apache/cassandra/db/marshal/MapType.java +++ b/src/java/org/apache/cassandra/db/marshal/MapType.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.db.marshal; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.*; @@ -75,21 +76,33 @@ public class MapType<K, V> extends CollectionType<Map<K, V>> public Map<K, V> compose(ByteBuffer bytes) { - ByteBuffer input = bytes.duplicate(); - int n = input.getShort(); - Map<K, V> m = new LinkedHashMap<K, V>(n); - for (int i = 0; i < n; i++) + try { - int sk = input.getShort(); - byte[] datak = new byte[sk]; - input.get(datak); - - int sv = input.getShort(); - byte[] datav = new byte[sv]; - input.get(datav); - m.put(keys.compose(ByteBuffer.wrap(datak)), values.compose(ByteBuffer.wrap(datav))); + ByteBuffer input = bytes.duplicate(); + int n = input.getShort(); + Map<K, V> m = new LinkedHashMap<K, V>(n); + for (int i = 0; i < n; i++) + { + int sk = input.getShort(); + byte[] datak = new byte[sk]; + input.get(datak); + ByteBuffer kbb = ByteBuffer.wrap(datak); + keys.validate(kbb); + + int sv = input.getShort(); + byte[] datav = new byte[sv]; + input.get(datav); + ByteBuffer vbb = ByteBuffer.wrap(datav); + values.validate(vbb); + + m.put(keys.compose(kbb), values.compose(vbb)); + } + return m; + } + catch (BufferUnderflowException e) + { + throw new MarshalException("Not enough bytes to read a map"); } - return m; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b00f3a2/src/java/org/apache/cassandra/db/marshal/SetType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java index 724754f..c76273a 100644 --- a/src/java/org/apache/cassandra/db/marshal/SetType.java +++ b/src/java/org/apache/cassandra/db/marshal/SetType.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.db.marshal; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.*; @@ -72,17 +73,26 @@ public class SetType<T> extends CollectionType<Set<T>> public Set<T> compose(ByteBuffer bytes) { - ByteBuffer input = bytes.duplicate(); - int n = input.getShort(); - Set<T> l = new LinkedHashSet<T>(n); - for (int i = 0; i < n; i++) + try { - int s = input.getShort(); - byte[] data = new byte[s]; - input.get(data); - l.add(elements.compose(ByteBuffer.wrap(data))); + ByteBuffer input = bytes.duplicate(); + int n = input.getShort(); + Set<T> l = new LinkedHashSet<T>(n); + for (int i = 0; i < n; i++) + { + int s = input.getShort(); + byte[] data = new byte[s]; + input.get(data); + ByteBuffer databb = ByteBuffer.wrap(data); + elements.validate(databb); + l.add(elements.compose(databb)); + } + return l; + } + catch (BufferUnderflowException e) + { + throw new MarshalException("Not enough bytes to read a set"); } - return l; } /**