Updated Branches: refs/heads/trunk 314c8e85d -> 6d04ef038
Binary protocol: adds message to batch (prepared or not) statements patch by slebresne; reviewed by iamaleksey for CASSANDRA-4693 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d04ef03 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d04ef03 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d04ef03 Branch: refs/heads/trunk Commit: 6d04ef0383eb09716377f649b4c6f903624a31ac Parents: 314c8e8 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Thu May 16 16:37:42 2013 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Mon May 27 09:47:14 2013 +0200 ---------------------------------------------------------------------- doc/native_protocol_v2.spec | 42 ++- .../org/apache/cassandra/cql3/QueryProcessor.java | 17 +- .../cassandra/cql3/statements/BatchStatement.java | 68 +++- .../org/apache/cassandra/transport/CBUtil.java | 7 + .../org/apache/cassandra/transport/Message.java | 3 +- .../cassandra/transport/messages/BatchMessage.java | 255 +++++++++++++++ .../cassandra/transport/messages/QueryMessage.java | 7 +- 7 files changed, 374 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/doc/native_protocol_v2.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol_v2.spec b/doc/native_protocol_v2.spec index a765700..20c0a80 100644 --- a/doc/native_protocol_v2.spec +++ b/doc/native_protocol_v2.spec @@ -20,7 +20,8 @@ Table of Contents 4.1.4. QUERY 4.1.5. PREPARE 4.1.6. EXECUTE - 4.1.7. REGISTER + 4.1.7. BATCH + 4.1.8. REGISTER 4.2. Responses 4.2.1. ERROR 4.2.2. READY @@ -159,6 +160,7 @@ Table of Contents 0x0A EXECUTE 0x0B REGISTER 0x0C EVENT + 0x0D BATCH Messages are described in Section 4. @@ -304,8 +306,36 @@ Table of Contents The response from the server will be a RESULT message. +4.1.7. BATCH -4.1.7. REGISTER + Allows executing a list of queries (prepared or not) as a batch (note that + only DML statements are accepted in a batch). The body of the message must + be: + <type><n><query_1>...<query_n><consistency> + where: + - <type> is a [byte] indicating the type of batch to use: + - If <type> == 0, the batch will be "logged". This is equivalent to a + normal CQL3 batch statement. + - If <type> == 1, the batch will be "unlogged". + - If <type> == 2, the batch will be a "counter" batch (and non-counter + statements will be rejected). + - <n> is a [short] indicating the number of following queries. + - <query_1>...<query_n> are the queries to execute. A <query_i> must be of the + form: + <kind><string_or_id><n><value_1>...<value_n> + where: + - <kind> is a [byte] indicating whether the following query is a prepared + one or not. <kind> value must be either 0 or 1. + - <string_or_id> depends on the value of <kind>. If <kind> == 0, it should be + a [long string] query string (as in QUERY, the query string might contain + bind markers). Otherwise (that is, if <kind> == 1), it should be a + [short bytes] representing a prepared query ID. + - <n> is a [short] indicating the number (possibly 0) of following values. + - <value_1>...<value_n> are the [bytes] to use for bound variables. + - <consistency> is the [consistency] level for the operation. + + +4.1.8. REGISTER Register this connection to receive some type of events. The body of the message is a [string list] representing the event types to register to. See @@ -645,8 +675,10 @@ Table of Contents bytes] representing the unknown ID. 8. Changes from v1 - * Protocol is versioned to allow old client connects to a newer server, if a newer - client connects to an older server, it needs to check if it gets a + * Protocol is versioned to allow old client connects to a newer server, if a + newer client connects to an older server, it needs to check if it gets a ProtocolException on connection and try connecting with a lower version. * A query can now have bind variables even though the statement is not - prepared. (see Section 4.1.4) \ No newline at end of file + prepared; see Section 4.1.4. + * A new BATCH message allows to batch a set of queries (prepared or not); see + Section 4.1.7. http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index f7aebff..2edbb96 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -125,8 +125,8 @@ public class QueryProcessor throws RequestExecutionException, RequestValidationException { ClientState clientState = queryState.getClientState(); - statement.validate(clientState); statement.checkAccess(clientState); + statement.validate(clientState); ResultMessage result = statement.execute(cl, queryState, variables); return result == null ? new ResultMessage.Void() : result; } @@ -147,6 +147,11 @@ public class QueryProcessor return processStatement(prepared, cl, queryState, variables); } + public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException + { + return getStatement(queryStr, queryState.getClientState()).statement; + } + public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException { try @@ -261,6 +266,16 @@ public class QueryProcessor return processStatement(statement, cl, queryState, variables); } + public static ResultMessage processBatch(BatchStatement batch, ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) + throws RequestExecutionException, RequestValidationException + { + ClientState clientState = queryState.getClientState(); + batch.checkAccess(clientState); + batch.validate(clientState); + batch.executeWithPerStatementVariables(cl, queryState, variables); + return new ResultMessage.Void(); + } + private static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState) throws RequestValidationException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index c7cd9ae..d6d0e16 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -98,25 +98,47 @@ public class BatchStatement implements CQLStatement { Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>(); for (ModificationStatement statement : statements) + addStatementMutations(statement, variables, local, cl, now, mutations); + + return mutations.values(); + } + + private Collection<? extends IMutation> getMutations(List<List<ByteBuffer>> variables, ConsistencyLevel cl, long now) + throws RequestExecutionException, RequestValidationException + { + Map<Pair<String, ByteBuffer>, IMutation> mutations = new HashMap<Pair<String, ByteBuffer>, IMutation>(); + for (int i = 0; i < statements.size(); i++) { - // Group mutation together, otherwise they won't get applied atomically - for (IMutation m : statement.getMutations(variables, local, cl, getTimestamp(now), true)) + ModificationStatement statement = statements.get(i); + List<ByteBuffer> statementVariables = variables.get(i); + addStatementMutations(statement, statementVariables, false, cl, now, mutations); + } + return mutations.values(); + } + + private void addStatementMutations(ModificationStatement statement, + List<ByteBuffer> variables, + boolean local, + ConsistencyLevel cl, + long now, + Map<Pair<String, ByteBuffer>, IMutation> mutations) + throws RequestExecutionException, RequestValidationException + { + // Group mutation together, otherwise they won't get applied atomically + for (IMutation m : statement.getMutations(variables, local, cl, getTimestamp(now), true)) + { + Pair<String, ByteBuffer> key = Pair.create(m.getTable(), m.key()); + IMutation existing = mutations.get(key); + + if (existing == null) { - Pair<String, ByteBuffer> key = Pair.create(m.getTable(), m.key()); - IMutation existing = mutations.get(key); - - if (existing == null) - { - mutations.put(key, m); - } - else - { - existing.addAll(m); - } + mutations.put(key, m); + } + else + { + existing.addAll(m); } } - - return mutations.values(); } public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException @@ -124,10 +146,22 @@ public class BatchStatement implements CQLStatement if (cl == null) throw new InvalidRequestException("Invalid empty consistency level"); - Collection<? extends IMutation> mutations = getMutations(variables, false, cl, queryState.getTimestamp()); + execute(getMutations(variables, false, cl, queryState.getTimestamp()), cl); + return null; + } + + public void executeWithPerStatementVariables(ConsistencyLevel cl, QueryState queryState, List<List<ByteBuffer>> variables) throws RequestExecutionException, RequestValidationException + { + if (cl == null) + throw new InvalidRequestException("Invalid empty consistency level"); + + execute(getMutations(variables, cl, queryState.getTimestamp()), cl); + } + + private void execute(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException + { boolean mutateAtomic = (type == Type.LOGGED && mutations.size() > 1); StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic); - return null; } public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/transport/CBUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java index 2ad8c72..897a3d9 100644 --- a/src/java/org/apache/cassandra/transport/CBUtil.java +++ b/src/java/org/apache/cassandra/transport/CBUtil.java @@ -102,6 +102,13 @@ public abstract class CBUtil return cb; } + public static ChannelBuffer byteToCB(byte b) + { + ChannelBuffer cb = ChannelBuffers.buffer(1); + cb.writeByte(b); + return cb; + } + public static ChannelBuffer intToCB(int i) { ChannelBuffer cb = ChannelBuffers.buffer(4); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 3121ce9..e57da51 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -69,7 +69,8 @@ public abstract class Message PREPARE (9, Direction.REQUEST, PrepareMessage.codec), EXECUTE (10, Direction.REQUEST, ExecuteMessage.codec), REGISTER (11, Direction.REQUEST, RegisterMessage.codec), - EVENT (12, Direction.RESPONSE, EventMessage.codec); + EVENT (12, Direction.RESPONSE, EventMessage.codec), + BATCH (13, Direction.REQUEST, BatchMessage.codec); public final int opcode; public final Direction direction; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/transport/messages/BatchMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java new file mode 100644 index 0000000..3bec918 --- /dev/null +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.transport.messages; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import org.apache.cassandra.cql3.Attributes; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.statements.BatchStatement; +import org.apache.cassandra.cql3.statements.ModificationStatement; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.PreparedQueryNotFoundException; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.transport.*; +import org.apache.cassandra.utils.MD5Digest; +import org.apache.cassandra.utils.UUIDGen; + +public class BatchMessage extends Message.Request +{ + public static final Message.Codec<BatchMessage> codec = new Message.Codec<BatchMessage>() + { + public BatchMessage decode(ChannelBuffer body, int version) + { + if (version == 1) + throw new ProtocolException("BATCH messages are not support in version 1 of the protocol"); + + byte type = body.readByte(); + int n = body.readUnsignedShort(); + List<Object> queryOrIds = new ArrayList<Object>(n); + List<List<ByteBuffer>> variables = new ArrayList<List<ByteBuffer>>(n); + for (int i = 0; i < n; i++) + { + byte kind = body.readByte(); + if (kind == 0) + queryOrIds.add(CBUtil.readLongString(body)); + else if (kind == 1) + queryOrIds.add(MD5Digest.wrap(CBUtil.readBytes(body))); + else + throw new ProtocolException("Invalid query kind in BATCH messages. Must be 0 or 1 but got " + kind); + + int count = body.readUnsignedShort(); + List<ByteBuffer> values = count == 0 ? Collections.<ByteBuffer>emptyList() : new ArrayList<ByteBuffer>(count); + for (int j = 0; j < count; j++) + values.add(CBUtil.readValue(body)); + variables.add(values); + } + ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body); + return new BatchMessage(toType(type), queryOrIds, variables, consistency); + } + + public ChannelBuffer encode(BatchMessage msg) + { + // We have: + // - type + // - Number of queries + // - For each query: + // - kind + // - string or id + // - value count + // - values + // - consistency + int queries = msg.queryOrIdList.size(); + int totalValues = count(msg.values); + + ChannelBuffer header = ChannelBuffers.buffer(3); + header.writeByte(fromType(msg.type)); + header.writeShort(queries); + + CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2 + queries * 3, 0, totalValues); + builder.add(header); + for (int i = 0; i < queries; i++) + { + Object q = msg.queryOrIdList.get(i); + builder.add(CBUtil.byteToCB((byte)(q instanceof String ? 0 : 1))); + if (q instanceof String) + builder.add(CBUtil.longStringToCB((String)q)); + else + builder.add(CBUtil.bytesToCB(((MD5Digest)q).bytes)); + List<ByteBuffer> queryValues = msg.values.get(i); + builder.add(CBUtil.shortToCB(queryValues.size())); + for (ByteBuffer value : queryValues) + builder.addValue(value); + } + + builder.add(CBUtil.consistencyLevelToCB(msg.consistency)); + return builder.build(); + } + + private BatchStatement.Type toType(byte b) + { + if (b == 0) + return BatchStatement.Type.LOGGED; + else if (b == 1) + return BatchStatement.Type.UNLOGGED; + else if (b == 2) + return BatchStatement.Type.COUNTER; + else + throw new ProtocolException("Invalid BATCH message type " + b); + } + + private byte fromType(BatchStatement.Type type) + { + switch (type) + { + case LOGGED: return 0; + case UNLOGGED: return 1; + case COUNTER: return 2; + default: + throw new AssertionError(); + } + } + + private int count(List<List<ByteBuffer>> values) + { + int count = 0; + for (List<ByteBuffer> l : values) + count += l.size(); + return count; + } + }; + + public final BatchStatement.Type type; + public final List<Object> queryOrIdList; + public final List<List<ByteBuffer>> values; + public final ConsistencyLevel consistency; + + public BatchMessage(BatchStatement.Type type, List<Object> queryOrIdList, List<List<ByteBuffer>> values, ConsistencyLevel consistency) + { + super(Message.Type.BATCH); + this.type = type; + this.queryOrIdList = queryOrIdList; + this.values = values; + this.consistency = consistency; + } + + public ChannelBuffer encode() + { + return codec.encode(this); + } + + public Message.Response execute(QueryState state) + { + try + { + UUID tracingId = null; + if (isTracingRequested()) + { + tracingId = UUIDGen.getTimeUUID(); + state.prepareTracingSession(tracingId); + } + + if (state.traceNextQuery()) + { + state.createTracingSession(); + // TODO we don't have [typed] access to CQL bind variables here. CASSANDRA-4560 is open to add support. + Tracing.instance().begin("Execute batch of CQL3 queries", Collections.<String, String>emptyMap()); + } + + List<ModificationStatement> statements = new ArrayList<ModificationStatement>(queryOrIdList.size()); + for (int i = 0; i < queryOrIdList.size(); i++) + { + Object query = queryOrIdList.get(i); + CQLStatement statement; + if (query instanceof String) + { + statement = QueryProcessor.parseStatement((String)query, state); + } + else + { + statement = QueryProcessor.getPrepared((MD5Digest)query); + if (statement == null) + throw new PreparedQueryNotFoundException((MD5Digest)query); + } + + List<ByteBuffer> queryValues = values.get(i); + if (queryValues.size() != statement.getBoundsTerms()) + throw new InvalidRequestException(String.format("There were %d markers(?) in CQL but %d bound variables", + statement.getBoundsTerms(), + queryValues.size())); + if (!(statement instanceof ModificationStatement)) + throw new InvalidRequestException("Invalid statement in batch: only UPDATE, INSERT and DELETE statements are allowed."); + + ModificationStatement mst = (ModificationStatement)statement; + if (mst.isCounter()) + { + if (type != BatchStatement.Type.COUNTER) + throw new InvalidRequestException("Cannot include counter statement in a non-counter batch"); + } + else + { + if (type == BatchStatement.Type.COUNTER) + throw new InvalidRequestException("Cannot include non-counter statement in a counter batch"); + } + statements.add(mst); + } + + // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor + // (and no value would be really correct, so we prefer passing a clearly wrong one). + BatchStatement batch = new BatchStatement(-1, type, statements, new Attributes()); + Message.Response response = QueryProcessor.processBatch(batch, consistency, state, values); + + if (tracingId != null) + response.setTracingId(tracingId); + + return response; + } + catch (Exception e) + { + return ErrorMessage.fromException(e); + } + finally + { + Tracing.instance().stopSession(); + } + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append("BATCH of ["); + for (int i = 0; i < queryOrIdList.size(); i++) + { + if (i > 0) sb.append(", "); + sb.append(queryOrIdList.get(i)).append(" with ").append(values.get(i).size()).append(" values"); + } + sb.append("] at consistency ").append(consistency); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d04ef03/src/java/org/apache/cassandra/transport/messages/QueryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index d8d7fa2..9735654 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -44,13 +44,18 @@ public class QueryMessage extends Message.Request { String query = CBUtil.readLongString(body); ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body); - List<ByteBuffer> values = new ArrayList<ByteBuffer>(); + List<ByteBuffer> values; if (body.readable()) { int paramCount = body.readUnsignedShort(); + values = paramCount == 0 ? Collections.<ByteBuffer>emptyList() : new ArrayList<ByteBuffer>(paramCount); for (int i = 0; i < paramCount; i++) values.add(CBUtil.readValue(body)); } + else + { + values = Collections.emptyList(); + } return new QueryMessage(query, values, consistency); }