Updated Branches: refs/heads/trunk afe4d555b -> bfd73beaf
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 2f9627b..8b9f7df 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -138,8 +138,10 @@ public class SelectStatement implements CQLStatement // Nothing to do, all validation has been done by RawStatement.prepare() } - public ResultMessage.Rows execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize, PagingState pagingState) throws RequestExecutionException, RequestValidationException + public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException { + ConsistencyLevel cl = options.getConsistency(); + List<ByteBuffer> variables = options.getValues(); if (cl == null) throw new InvalidRequestException("Invalid empty consistency level"); @@ -158,18 +160,19 @@ public class SelectStatement implements CQLStatement command = commands == null ? null : new Pageable.ReadCommands(commands); } + int pageSize = options.getPageSize(); // A count query will never be paged for the user, but we always page it internally to avoid OOM. // If we user provided a pageSize we'll use that to page internally (because why not), otherwise we use our default - if (parameters.isCount && pageSize < 0) + if (parameters.isCount && pageSize <= 0) pageSize = DEFAULT_COUNT_PAGE_SIZE; - if (pageSize < 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) + if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize)) { return execute(command, cl, variables, limit, now); } else { - QueryPager pager = QueryPagers.pager(command, cl, pagingState); + QueryPager pager = QueryPagers.pager(command, cl, options.getPagingState()); if (parameters.isCount) return pageCountQuery(pager, variables, pageSize, now); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java index c66608b..318995e 100644 --- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java @@ -55,7 +55,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement ThriftValidation.validateColumnFamily(keyspace(), columnFamily()); } - public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize, PagingState pagingState) throws InvalidRequestException, TruncateException + public ResultMessage execute(QueryState state, QueryOptions options) throws InvalidRequestException, TruncateException { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/cql3/statements/UseStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java index c2e3c34..ab42fed 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.List; import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.UnauthorizedException; @@ -52,7 +53,7 @@ public class UseStatement extends ParsedStatement implements CQLStatement { } - public ResultMessage execute(ConsistencyLevel cl, QueryState state, List<ByteBuffer> variables, int pageSize, PagingState pagingState) throws InvalidRequestException + public ResultMessage execute(QueryState state, QueryOptions options) throws InvalidRequestException { state.getClientState().setKeyspace(keyspace); return new ResultMessage.SetKeyspace(keyspace); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/db/ConsistencyLevel.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java index d642d08..956ab58 100644 --- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java +++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java @@ -48,7 +48,8 @@ public enum ConsistencyLevel ALL (5), LOCAL_QUORUM(6), EACH_QUORUM (7), - SERIAL (8); + SERIAL (8), + LOCAL_SERIAL(9); private static final Logger logger = LoggerFactory.getLogger(ConsistencyLevel.class); @@ -277,11 +278,13 @@ public enum ConsistencyLevel requireNetworkTopologyStrategy(keyspaceName); break; case SERIAL: + case LOCAL_SERIAL: throw new InvalidRequestException("You must use conditional updates for serializable writes"); } } - public void validateForCas(String keyspaceName) throws InvalidRequestException + // This is the same than validateForWrite really, but we include a slightly different error message for SERIAL/LOCAL_SERIAL + public void validateForCasCommit(String keyspaceName) throws InvalidRequestException { switch (this) { @@ -289,11 +292,23 @@ public enum ConsistencyLevel case EACH_QUORUM: requireNetworkTopologyStrategy(keyspaceName); break; - case ANY: - throw new InvalidRequestException("ANY is not supported with CAS. Use SERIAL if you mean, make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads"); + case SERIAL: + case LOCAL_SERIAL: + throw new InvalidRequestException(this + " is not supported as conditional update commit consistency. Use ANY if you mean \"make sure it is accepted but I don't care how many replicas commit it for non-SERIAL reads\""); } } + public void validateForCas() throws InvalidRequestException + { + if (!isSerialConsistency()) + throw new InvalidRequestException("Invalid consistency for conditional update. Must be one of SERIAL or LOCAL_SERIAL"); + } + + public boolean isSerialConsistency() + { + return this == SERIAL || this == LOCAL_SERIAL; + } + public void validateCounterForWrite(CFMetaData metadata) throws InvalidRequestException { if (this == ConsistencyLevel.ANY) @@ -304,7 +319,7 @@ public enum ConsistencyLevel { throw new InvalidRequestException("cannot achieve CL > CL.ONE without replicate_on_write on columnfamily " + metadata.cfName); } - else if (this == ConsistencyLevel.SERIAL) + else if (isSerialConsistency()) { throw new InvalidRequestException("Counter operations are inherently non-serializable"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 443d6b8..9b7da6d 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -29,6 +29,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.*; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang.StringUtils; @@ -195,16 +196,25 @@ public class StorageProxy implements StorageProxyMBean * the first live column of the row). * @param expected the expected column values. This can be null to check for existence (see {@code prefix}). * @param updates the value to insert if {@code expected matches the current values}. - * @param consistencyLevel the consistency for the operation. + * @param consistencyForPaxos the consistency for the paxos prepare and propose round. This can only be either SERIAL or LOCAL_SERIAL. + * @param consistencyForCommit the consistency for write done during the commit phase. This can be anything, except SERIAL or LOCAL_SERIAL. * * @return null if the operation succeeds in updating the row, or the current values for the columns contained in * expected (since, if the CAS doesn't succeed, it means the current value do not match the one in expected). If * expected == null and the CAS is unsuccessfull, the first live column of the CF is returned. */ - public static ColumnFamily cas(String keyspaceName, String cfName, ByteBuffer key, ColumnNameBuilder prefix, ColumnFamily expected, ColumnFamily updates, ConsistencyLevel consistencyLevel) + public static ColumnFamily cas(String keyspaceName, + String cfName, + ByteBuffer key, + ColumnNameBuilder prefix, + ColumnFamily expected, + ColumnFamily updates, + ConsistencyLevel consistencyForPaxos, + ConsistencyLevel consistencyForCommit) throws UnavailableException, IsBootstrappingException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException { - consistencyLevel.validateForCas(keyspaceName); + consistencyForPaxos.validateForCas(); + consistencyForCommit.validateForCasCommit(keyspaceName); CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); @@ -213,11 +223,11 @@ public class StorageProxy implements StorageProxyMBean while (System.nanoTime() - start < timeout) { // for simplicity, we'll do a single liveness check at the start of each attempt - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(keyspaceName, key); + Pair<List<InetAddress>, Integer> p = getPaxosParticipants(keyspaceName, key, consistencyForPaxos); List<InetAddress> liveEndpoints = p.left; int requiredParticipants = p.right; - UUID ballot = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants); + UUID ballot = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos); // read the current value and compare with expected Tracing.trace("Reading existing values for CAS precondition"); @@ -250,10 +260,10 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); if (proposePaxos(proposal, liveEndpoints, requiredParticipants)) { - if (consistencyLevel == ConsistencyLevel.SERIAL) + if (consistencyForCommit == ConsistencyLevel.ANY) sendCommit(proposal, liveEndpoints); else - commitPaxos(proposal, consistencyLevel); + commitPaxos(proposal, consistencyForCommit); Tracing.trace("CAS successful"); return null; } @@ -263,7 +273,7 @@ public class StorageProxy implements StorageProxyMBean // continue to retry } - throw new WriteTimeoutException(WriteType.CAS, ConsistencyLevel.SERIAL, -1, -1); + throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, -1, -1); } private static boolean hasLiveColumns(ColumnFamily cf, long now) @@ -301,15 +311,35 @@ public class StorageProxy implements StorageProxyMBean return true; } - private static Pair<List<InetAddress>, Integer> getPaxosParticipants(String keyspaceName, ByteBuffer key) throws UnavailableException + private static Predicate<InetAddress> sameDCPredicateFor(final String dc) + { + final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + return new Predicate<InetAddress>() + { + public boolean apply(InetAddress host) + { + return dc.equals(snitch.getDatacenter(host)); + } + }; + } + + private static Pair<List<InetAddress>, Integer> getPaxosParticipants(String keyspaceName, ByteBuffer key, ConsistencyLevel consistencyForPaxos) throws UnavailableException { Token tk = StorageService.getPartitioner().getToken(key); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); + if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL) + { + // Restrict naturalEndpoints and pendingEndpoints to node in the local DC only + String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + Predicate<InetAddress> isLocalDc = sameDCPredicateFor(localDc); + naturalEndpoints = ImmutableList.copyOf(Iterables.filter(naturalEndpoints, isLocalDc)); + pendingEndpoints = ImmutableList.copyOf(Iterables.filter(pendingEndpoints, isLocalDc)); + } int requiredParticipants = pendingEndpoints.size() + 1 + naturalEndpoints.size() / 2; // See CASSANDRA-833 List<InetAddress> liveEndpoints = ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), IAsyncCallback.isAlive)); if (liveEndpoints.size() < requiredParticipants) - throw new UnavailableException(ConsistencyLevel.SERIAL, requiredParticipants, liveEndpoints.size()); + throw new UnavailableException(consistencyForPaxos, requiredParticipants, liveEndpoints.size()); return Pair.create(liveEndpoints, requiredParticipants); } @@ -319,7 +349,7 @@ public class StorageProxy implements StorageProxyMBean * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of * nodes have seen the mostRecentCommit. Otherwise, return null. */ - private static UUID beginAndRepairPaxos(long start, ByteBuffer key, CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants) + private static UUID beginAndRepairPaxos(long start, ByteBuffer key, CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos) throws WriteTimeoutException { long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout()); @@ -376,7 +406,7 @@ public class StorageProxy implements StorageProxyMBean return ballot; } - throw new WriteTimeoutException(WriteType.CAS, ConsistencyLevel.SERIAL, -1, -1); + throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, -1, -1); } /** @@ -1079,26 +1109,26 @@ public class StorageProxy implements StorageProxyMBean List<Row> rows = null; try { - if (consistency_level == ConsistencyLevel.SERIAL) + if (consistency_level.isSerialConsistency()) { // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read if (commands.size() > 1) - throw new InvalidRequestException("SERIAL consistency may only be requested for one row at a time"); + throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one row at a time"); ReadCommand command = commands.get(0); CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName); - Pair<List<InetAddress>, Integer> p = getPaxosParticipants(command.ksName, command.key); + Pair<List<InetAddress>, Integer> p = getPaxosParticipants(command.ksName, command.key, consistency_level); List<InetAddress> liveEndpoints = p.left; int requiredParticipants = p.right; // does the work of applying in-progress writes; throws UAE or timeout if it can't try { - beginAndRepairPaxos(start, command.key, metadata, liveEndpoints, requiredParticipants); + beginAndRepairPaxos(start, command.key, metadata, liveEndpoints, requiredParticipants, consistency_level); } catch (WriteTimeoutException e) { - throw new ReadTimeoutException(ConsistencyLevel.SERIAL, -1, -1, false); + throw new ReadTimeoutException(consistency_level, -1, -1, false); } rows = fetchRows(commands, ConsistencyLevel.QUORUM); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 15ab9d0..e9473d9 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -43,6 +43,7 @@ import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql.CQLStatement; import org.apache.cassandra.cql.QueryProcessor; +import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.*; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.filter.IDiskAtomFilter; @@ -691,7 +692,12 @@ public class CassandraServer implements Cassandra.Iface } } - public CASResult cas(ByteBuffer key, String column_family, List<Column> expected, List<Column> updates, ConsistencyLevel consistency_level) + public CASResult cas(ByteBuffer key, + String column_family, + List<Column> expected, + List<Column> updates, + ConsistencyLevel serial_consistency_level, + ConsistencyLevel commit_consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { if (startSessionIfRequested()) @@ -747,7 +753,14 @@ public class CassandraServer implements Cassandra.Iface } schedule(DatabaseDescriptor.getWriteRpcTimeout()); - ColumnFamily result = StorageProxy.cas(cState.getKeyspace(), column_family, key, null, cfExpected, cfUpdates, ThriftConversion.fromThrift(consistency_level)); + ColumnFamily result = StorageProxy.cas(cState.getKeyspace(), + column_family, + key, + null, + cfExpected, + cfUpdates, + ThriftConversion.fromThrift(serial_consistency_level), + ThriftConversion.fromThrift(commit_consistency_level)); return result == null ? new CASResult(true) : new CASResult(false).setCurrent_values(thriftifyColumnsAsColumns(result.getSortedColumns(), System.currentTimeMillis())); @@ -2024,11 +2037,8 @@ public class CassandraServer implements Cassandra.Iface logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundsTerms()); return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, - ThriftConversion.fromThrift(cLevel), cState.getQueryState(), - bindVariables, - -1, - null).toThriftResult(); + new QueryOptions(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult(); } catch (RequestExecutionException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/transport/Client.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java index 12c46dd..f0b700c 100644 --- a/src/java/org/apache/cassandra/transport/Client.java +++ b/src/java/org/apache/cassandra/transport/Client.java @@ -32,11 +32,13 @@ import java.util.Map; import com.google.common.base.Splitter; import org.apache.cassandra.auth.IAuthenticator; +import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.transport.messages.*; import org.apache.cassandra.utils.Hex; +import org.apache.cassandra.utils.MD5Digest; import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions; @@ -126,7 +128,7 @@ public class Client extends SimpleClient return null; } } - return new QueryMessage(query, ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), pageSize); + return new QueryMessage(query, new QueryOptions(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, pageSize, null, null)); } else if (msgType.equals("PREPARE")) { @@ -154,7 +156,7 @@ public class Client extends SimpleClient } values.add(bb); } - return new ExecuteMessage(id, values, ConsistencyLevel.ONE, -1); + return new ExecuteMessage(MD5Digest.wrap(id), new QueryOptions(ConsistencyLevel.ONE, values)); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index df4f811..cfe1bab 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -32,6 +32,7 @@ import javax.net.ssl.SSLEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.transport.messages.CredentialsMessage; @@ -41,6 +42,7 @@ import org.apache.cassandra.transport.messages.PrepareMessage; import org.apache.cassandra.transport.messages.QueryMessage; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.transport.messages.StartupMessage; +import org.apache.cassandra.utils.MD5Digest; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; @@ -158,7 +160,7 @@ public class SimpleClient public ResultMessage execute(String query, List<ByteBuffer> values, ConsistencyLevel consistencyLevel) { - Message.Response msg = execute(new QueryMessage(query, consistencyLevel, values, -1)); + Message.Response msg = execute(new QueryMessage(query, new QueryOptions(consistencyLevel, values))); assert msg instanceof ResultMessage; return (ResultMessage)msg; } @@ -172,7 +174,7 @@ public class SimpleClient public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency) { - Message.Response msg = execute(new ExecuteMessage(statementId, values, consistency, -1)); + Message.Response msg = execute(new ExecuteMessage(MD5Digest.wrap(statementId), new QueryOptions(consistency, values))); assert msg instanceof ResultMessage; return (ResultMessage)msg; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index f83df9d..c297426 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -26,9 +26,11 @@ import java.util.UUID; import com.google.common.collect.ImmutableMap; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.exceptions.PreparedQueryNotFoundException; import org.apache.cassandra.service.QueryState; @@ -40,131 +42,65 @@ import org.apache.cassandra.utils.UUIDGen; public class ExecuteMessage extends Message.Request { - public static enum Flag - { - // The order of that enum matters!! - PAGE_SIZE, - SKIP_METADATA, - PAGING_STATE; - - public static EnumSet<Flag> deserialize(int flags) - { - EnumSet<Flag> set = EnumSet.noneOf(Flag.class); - Flag[] values = Flag.values(); - for (int n = 0; n < values.length; n++) - { - if ((flags & (1 << n)) != 0) - set.add(values[n]); - } - return set; - } - - public static int serialize(EnumSet<Flag> flags) - { - int i = 0; - for (Flag flag : flags) - i |= 1 << flag.ordinal(); - return i; - } - } - public static final Message.Codec<ExecuteMessage> codec = new Message.Codec<ExecuteMessage>() { public ExecuteMessage decode(ChannelBuffer body, int version) { byte[] id = CBUtil.readBytes(body); - int count = body.readUnsignedShort(); - List<ByteBuffer> values = new ArrayList<ByteBuffer>(count); - for (int i = 0; i < count; i++) - values.add(CBUtil.readValue(body)); - - ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body); + if (version == 1) + { + int count = body.readUnsignedShort(); + List<ByteBuffer> values = new ArrayList<ByteBuffer>(count); + for (int i = 0; i < count; i++) + values.add(CBUtil.readValue(body)); - int resultPageSize = -1; - boolean skipMetadata = false; - PagingState pagingState = null; - if (version >= 2) + ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body); + return new ExecuteMessage(id, values, consistency); + } + else { - EnumSet<Flag> flags = Flag.deserialize((int)body.readByte()); - if (flags.contains(Flag.PAGE_SIZE)) - resultPageSize = body.readInt(); - skipMetadata = flags.contains(Flag.SKIP_METADATA); - if (flags.contains(Flag.PAGING_STATE)) - pagingState = PagingState.deserialize(CBUtil.readValue(body)); + return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.codec.decode(body, version)); } - return new ExecuteMessage(MD5Digest.wrap(id), values, consistency, resultPageSize, skipMetadata, pagingState); } public ChannelBuffer encode(ExecuteMessage msg, int version) { - // We have: - // - statementId - // - Number of values - // - The values - // - options - int vs = msg.values.size(); - - EnumSet<Flag> flags = EnumSet.noneOf(Flag.class); - if (msg.resultPageSize >= 0) - flags.add(Flag.PAGE_SIZE); - if (msg.skipMetadata) - flags.add(Flag.SKIP_METADATA); - if (msg.pagingState != null) - flags.add(Flag.PAGING_STATE); - - assert flags.isEmpty() || version >= 2; - - int nbBuff = 3; - if (version >= 2) + ChannelBuffer idBuffer = CBUtil.bytesToCB(msg.statementId.bytes); + ChannelBuffer optBuffer; + if (version == 1) { - nbBuff++; // the flags themselves - if (flags.contains(Flag.PAGE_SIZE)) - nbBuff++; - } - CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(nbBuff, 0, vs + (flags.contains(Flag.PAGING_STATE) ? 1 : 0)); - builder.add(CBUtil.bytesToCB(msg.statementId.bytes)); - builder.add(CBUtil.shortToCB(vs)); + CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, msg.options.getValues().size()); + builder.add(CBUtil.shortToCB(msg.options.getValues().size())); - // Values - for (ByteBuffer value : msg.values) - builder.addValue(value); + // Values + for (ByteBuffer value : msg.options.getValues()) + builder.addValue(value); - builder.add(CBUtil.consistencyLevelToCB(msg.consistency)); - - if (version >= 2) + builder.add(CBUtil.consistencyLevelToCB(msg.options.getConsistency())); + optBuffer = builder.build(); + } + else { - builder.add(CBUtil.byteToCB((byte)Flag.serialize(flags))); - if (flags.contains(Flag.PAGE_SIZE)) - builder.add(CBUtil.intToCB(msg.resultPageSize)); - if (flags.contains(Flag.PAGING_STATE)) - builder.addValue(msg.pagingState == null ? null : msg.pagingState.serialize()); + optBuffer = QueryOptions.codec.encode(msg.options, version); } - return builder.build(); + return ChannelBuffers.wrappedBuffer(idBuffer, optBuffer); } }; public final MD5Digest statementId; - public final List<ByteBuffer> values; - public final ConsistencyLevel consistency; - public final int resultPageSize; - public final boolean skipMetadata; - public final PagingState pagingState; + public final QueryOptions options; - public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize) + public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, ConsistencyLevel consistency) { - this(MD5Digest.wrap(statementId), values, consistency, resultPageSize, false, null); + this(MD5Digest.wrap(statementId), new QueryOptions(consistency, values)); } - public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, ConsistencyLevel consistency, int resultPageSize, boolean skipMetadata, PagingState pagingState) + public ExecuteMessage(MD5Digest statementId, QueryOptions options) { super(Message.Type.EXECUTE); this.statementId = statementId; - this.values = values; - this.consistency = consistency; - this.resultPageSize = resultPageSize; - this.skipMetadata = skipMetadata; - this.pagingState = pagingState; + this.options = options; } public ChannelBuffer encode(int version) @@ -181,7 +117,7 @@ public class ExecuteMessage extends Message.Request if (statement == null) throw new PreparedQueryNotFoundException(statementId); - if (resultPageSize == 0) + if (options.getPageSize() == 0) throw new ProtocolException("The page size cannot be 0"); UUID tracingId = null; @@ -196,15 +132,15 @@ public class ExecuteMessage extends Message.Request state.createTracingSession(); ImmutableMap.Builder<String, String> builder = ImmutableMap.builder(); - if (resultPageSize > 0) - builder.put("page_size", Integer.toString(resultPageSize)); + if (options.getPageSize() > 0) + builder.put("page_size", Integer.toString(options.getPageSize())); // TODO we don't have [typed] access to CQL bind variables here. CASSANDRA-4560 is open to add support. Tracing.instance.begin("Execute CQL3 prepared query", builder.build()); } - Message.Response response = QueryProcessor.processPrepared(statement, consistency, state, values, resultPageSize, pagingState); - if (skipMetadata && response instanceof ResultMessage.Rows) + Message.Response response = QueryProcessor.processPrepared(statement, state, options); + if (options.skipMetadata() && response instanceof ResultMessage.Rows) ((ResultMessage.Rows)response).result.metadata.setSkipMetadata(); if (tracingId != null) @@ -225,6 +161,6 @@ public class ExecuteMessage extends Message.Request @Override public String toString() { - return "EXECUTE " + statementId + " with " + values.size() + " values at consistency " + consistency; + return "EXECUTE " + statementId + " with " + options.getValues().size() + " values at consistency " + options.getConsistency(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/transport/messages/QueryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index 9e8050c..2b2583e 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -26,8 +26,10 @@ import java.util.UUID; import com.google.common.collect.ImmutableMap; import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.QueryState; @@ -41,147 +43,43 @@ import org.apache.cassandra.utils.UUIDGen; */ public class QueryMessage extends Message.Request { - public static enum Flag - { - // The order of that enum matters!! - PAGE_SIZE, - VALUES, - SKIP_METADATA, - PAGING_STATE; - - public static EnumSet<Flag> deserialize(int flags) - { - EnumSet<Flag> set = EnumSet.noneOf(Flag.class); - Flag[] values = Flag.values(); - for (int n = 0; n < values.length; n++) - { - if ((flags & (1 << n)) != 0) - set.add(values[n]); - } - return set; - } - - public static int serialize(EnumSet<Flag> flags) - { - int i = 0; - for (Flag flag : flags) - i |= 1 << flag.ordinal(); - return i; - } - } - public static final Message.Codec<QueryMessage> codec = new Message.Codec<QueryMessage>() { public QueryMessage decode(ChannelBuffer body, int version) { String query = CBUtil.readLongString(body); - ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body); - - int resultPageSize = -1; - List<ByteBuffer> values = Collections.emptyList(); - boolean skipMetadata = false; - PagingState pagingState = null; - if (version >= 2) + if (version == 1) { - EnumSet<Flag> flags = Flag.deserialize((int)body.readByte()); - - if (flags.contains(Flag.PAGE_SIZE)) - resultPageSize = body.readInt(); - - if (flags.contains(Flag.VALUES)) - { - int paramCount = body.readUnsignedShort(); - values = paramCount == 0 ? Collections.<ByteBuffer>emptyList() : new ArrayList<ByteBuffer>(paramCount); - for (int i = 0; i < paramCount; i++) - values.add(CBUtil.readValue(body)); - } - - skipMetadata = flags.contains(Flag.SKIP_METADATA); - - if (flags.contains(Flag.PAGING_STATE)) - pagingState = PagingState.deserialize(CBUtil.readValue(body)); + ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body); + return new QueryMessage(query, consistency); + } + else + { + return new QueryMessage(query, QueryOptions.codec.decode(body, version)); } - return new QueryMessage(query, consistency, values, resultPageSize, skipMetadata, pagingState); } public ChannelBuffer encode(QueryMessage msg, int version) { - // We have: - // - query - // - options - // * optional: - // - Number of values - // - The values - int vs = msg.values.size(); - - EnumSet<Flag> flags = EnumSet.noneOf(Flag.class); - if (msg.resultPageSize >= 0) - flags.add(Flag.PAGE_SIZE); - if (vs > 0) - flags.add(Flag.VALUES); - if (msg.skipMetadata) - flags.add(Flag.SKIP_METADATA); - if (msg.pagingState != null) - flags.add(Flag.PAGING_STATE); - - assert flags.isEmpty() || version >= 2 : "Version 1 of the protocol supports no option after the consistency level"; - - int nbBuff = 2; - if (version >= 2) - { - nbBuff++; // the flags themselves - if (flags.contains(Flag.PAGE_SIZE)) - nbBuff++; - if (flags.contains(Flag.VALUES)) - nbBuff++; - } - CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(nbBuff, 0, vs + (flags.contains(Flag.PAGING_STATE) ? 1 : 0)); - builder.add(CBUtil.longStringToCB(msg.query)); - builder.add(CBUtil.consistencyLevelToCB(msg.consistency)); - if (version >= 2) - { - builder.add(CBUtil.byteToCB((byte)Flag.serialize(flags))); - if (flags.contains(Flag.PAGE_SIZE)) - builder.add(CBUtil.intToCB(msg.resultPageSize)); - if (flags.contains(Flag.VALUES)) - { - builder.add(CBUtil.shortToCB(vs)); - for (ByteBuffer value : msg.values) - builder.addValue(value); - } - if (flags.contains(Flag.PAGING_STATE)) - builder.addValue(msg.pagingState == null ? null : msg.pagingState.serialize()); - } - return builder.build(); + return ChannelBuffers.wrappedBuffer(CBUtil.longStringToCB(msg.query), + (version == 1 ? CBUtil.consistencyLevelToCB(msg.options.getConsistency()) + : QueryOptions.codec.encode(msg.options, version))); } }; public final String query; - public final ConsistencyLevel consistency; - public final int resultPageSize; - public final List<ByteBuffer> values; - public final boolean skipMetadata; - public final PagingState pagingState; + public final QueryOptions options; public QueryMessage(String query, ConsistencyLevel consistency) { - this(query, consistency, Collections.<ByteBuffer>emptyList(), -1); - } - - public QueryMessage(String query, ConsistencyLevel consistency, List<ByteBuffer> values, int resultPageSize) - { - this(query, consistency, values, resultPageSize, false, null); + this(query, new QueryOptions(consistency, Collections.<ByteBuffer>emptyList())); } - public QueryMessage(String query, ConsistencyLevel consistency, List<ByteBuffer> values, int resultPageSize, boolean skipMetadata, PagingState pagingState) + public QueryMessage(String query, QueryOptions options) { super(Type.QUERY); this.query = query; - this.consistency = consistency; - this.resultPageSize = resultPageSize; - this.values = values; - this.skipMetadata = skipMetadata; - this.pagingState = pagingState; + this.options = options; } public ChannelBuffer encode(int version) @@ -193,7 +91,7 @@ public class QueryMessage extends Message.Request { try { - if (resultPageSize == 0) + if (options.getPageSize() == 0) throw new ProtocolException("The page size cannot be 0"); UUID tracingId = null; @@ -209,14 +107,14 @@ public class QueryMessage extends Message.Request ImmutableMap.Builder<String, String> builder = ImmutableMap.builder(); builder.put("query", query); - if (resultPageSize > 0) - builder.put("page_size", Integer.toString(resultPageSize)); + if (options.getPageSize() > 0) + builder.put("page_size", Integer.toString(options.getPageSize())); Tracing.instance.begin("Execute CQL3 query", builder.build()); } - Message.Response response = QueryProcessor.process(query, values, consistency, state, resultPageSize, pagingState); - if (skipMetadata && response instanceof ResultMessage.Rows) + Message.Response response = QueryProcessor.process(query, state, options); + if (options.skipMetadata() && response instanceof ResultMessage.Rows) ((ResultMessage.Rows)response).result.metadata.setSkipMetadata(); if (tracingId != null)