Updated Branches:
  refs/heads/trunk afe4d555b -> bfd73beaf

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2f9627b..8b9f7df 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -138,8 +138,10 @@ public class SelectStatement implements CQLStatement
         // Nothing to do, all validation has been done by 
RawStatement.prepare()
     }
 
-    public ResultMessage.Rows execute(ConsistencyLevel cl, QueryState state, 
List<ByteBuffer> variables, int pageSize, PagingState pagingState) throws 
RequestExecutionException, RequestValidationException
+    public ResultMessage.Rows execute(QueryState state, QueryOptions options) 
throws RequestExecutionException, RequestValidationException
     {
+        ConsistencyLevel cl = options.getConsistency();
+        List<ByteBuffer> variables = options.getValues();
         if (cl == null)
             throw new InvalidRequestException("Invalid empty consistency 
level");
 
@@ -158,18 +160,19 @@ public class SelectStatement implements CQLStatement
             command = commands == null ? null : new 
Pageable.ReadCommands(commands);
         }
 
+        int pageSize = options.getPageSize();
         // A count query will never be paged for the user, but we always page 
it internally to avoid OOM.
         // If we user provided a pageSize we'll use that to page internally 
(because why not), otherwise we use our default
-        if (parameters.isCount && pageSize < 0)
+        if (parameters.isCount && pageSize <= 0)
             pageSize = DEFAULT_COUNT_PAGE_SIZE;
 
-        if (pageSize < 0 || command == null || 
!QueryPagers.mayNeedPaging(command, pageSize))
+        if (pageSize <= 0 || command == null || 
!QueryPagers.mayNeedPaging(command, pageSize))
         {
             return execute(command, cl, variables, limit, now);
         }
         else
         {
-            QueryPager pager = QueryPagers.pager(command, cl, pagingState);
+            QueryPager pager = QueryPagers.pager(command, cl, 
options.getPagingState());
             if (parameters.isCount)
                 return pageCountQuery(pager, variables, pageSize, now);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index c66608b..318995e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -55,7 +55,7 @@ public class TruncateStatement extends CFStatement implements 
CQLStatement
         ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, 
List<ByteBuffer> variables, int pageSize, PagingState pagingState) throws 
InvalidRequestException, TruncateException
+    public ResultMessage execute(QueryState state, QueryOptions options) 
throws InvalidRequestException, TruncateException
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index c2e3c34..ab42fed 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
@@ -52,7 +53,7 @@ public class UseStatement extends ParsedStatement implements 
CQLStatement
     {
     }
 
-    public ResultMessage execute(ConsistencyLevel cl, QueryState state, 
List<ByteBuffer> variables, int pageSize, PagingState pagingState) throws 
InvalidRequestException
+    public ResultMessage execute(QueryState state, QueryOptions options) 
throws InvalidRequestException
     {
         state.getClientState().setKeyspace(keyspace);
         return new ResultMessage.SetKeyspace(keyspace);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java 
b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index d642d08..956ab58 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -48,7 +48,8 @@ public enum ConsistencyLevel
     ALL         (5),
     LOCAL_QUORUM(6),
     EACH_QUORUM (7),
-    SERIAL      (8);
+    SERIAL      (8),
+    LOCAL_SERIAL(9);
 
     private static final Logger logger = 
LoggerFactory.getLogger(ConsistencyLevel.class);
 
@@ -277,11 +278,13 @@ public enum ConsistencyLevel
                 requireNetworkTopologyStrategy(keyspaceName);
                 break;
             case SERIAL:
+            case LOCAL_SERIAL:
                 throw new InvalidRequestException("You must use conditional 
updates for serializable writes");
         }
     }
 
-    public void validateForCas(String keyspaceName) throws 
InvalidRequestException
+    // This is the same than validateForWrite really, but we include a 
slightly different error message for SERIAL/LOCAL_SERIAL
+    public void validateForCasCommit(String keyspaceName) throws 
InvalidRequestException
     {
         switch (this)
         {
@@ -289,11 +292,23 @@ public enum ConsistencyLevel
             case EACH_QUORUM:
                 requireNetworkTopologyStrategy(keyspaceName);
                 break;
-            case ANY:
-                throw new InvalidRequestException("ANY is not supported with 
CAS. Use SERIAL if you mean, make sure it is accepted but I don't care how many 
replicas commit it for non-SERIAL reads");
+            case SERIAL:
+            case LOCAL_SERIAL:
+                throw new InvalidRequestException(this + " is not supported as 
conditional update commit consistency. Use ANY if you mean \"make sure it is 
accepted but I don't care how many replicas commit it for non-SERIAL reads\"");
         }
     }
 
+    public void validateForCas() throws InvalidRequestException
+    {
+        if (!isSerialConsistency())
+            throw new InvalidRequestException("Invalid consistency for 
conditional update. Must be one of SERIAL or LOCAL_SERIAL");
+    }
+
+    public boolean isSerialConsistency()
+    {
+        return this == SERIAL || this == LOCAL_SERIAL;
+    }
+
     public void validateCounterForWrite(CFMetaData metadata) throws 
InvalidRequestException
     {
         if (this == ConsistencyLevel.ANY)
@@ -304,7 +319,7 @@ public enum ConsistencyLevel
         {
             throw new InvalidRequestException("cannot achieve CL > CL.ONE 
without replicate_on_write on columnfamily " + metadata.cfName);
         }
-        else if (this == ConsistencyLevel.SERIAL)
+        else if (isSerialConsistency())
         {
             throw new InvalidRequestException("Counter operations are 
inherently non-serializable");
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 443d6b8..9b7da6d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -29,6 +29,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.base.Function;
+import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang.StringUtils;
@@ -195,16 +196,25 @@ public class StorageProxy implements StorageProxyMBean
      * the first live column of the row).
      * @param expected the expected column values. This can be null to check 
for existence (see {@code prefix}).
      * @param updates the value to insert if {@code expected matches the 
current values}.
-     * @param consistencyLevel the consistency for the operation.
+     * @param consistencyForPaxos the consistency for the paxos prepare and 
propose round. This can only be either SERIAL or LOCAL_SERIAL.
+     * @param consistencyForCommit the consistency for write done during the 
commit phase. This can be anything, except SERIAL or LOCAL_SERIAL.
      *
      * @return null if the operation succeeds in updating the row, or the 
current values for the columns contained in
      * expected (since, if the CAS doesn't succeed, it means the current value 
do not match the one in expected). If
      * expected == null and the CAS is unsuccessfull, the first live column of 
the CF is returned.
      */
-    public static ColumnFamily cas(String keyspaceName, String cfName, 
ByteBuffer key, ColumnNameBuilder prefix, ColumnFamily expected, ColumnFamily 
updates, ConsistencyLevel consistencyLevel)
+    public static ColumnFamily cas(String keyspaceName,
+                                   String cfName,
+                                   ByteBuffer key,
+                                   ColumnNameBuilder prefix,
+                                   ColumnFamily expected,
+                                   ColumnFamily updates,
+                                   ConsistencyLevel consistencyForPaxos,
+                                   ConsistencyLevel consistencyForCommit)
     throws UnavailableException, IsBootstrappingException, 
ReadTimeoutException, WriteTimeoutException, InvalidRequestException
     {
-        consistencyLevel.validateForCas(keyspaceName);
+        consistencyForPaxos.validateForCas();
+        consistencyForCommit.validateForCasCommit(keyspaceName);
 
         CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, 
cfName);
 
@@ -213,11 +223,11 @@ public class StorageProxy implements StorageProxyMBean
         while (System.nanoTime() - start < timeout)
         {
             // for simplicity, we'll do a single liveness check at the start 
of each attempt
-            Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(keyspaceName, key);
+            Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(keyspaceName, key, consistencyForPaxos);
             List<InetAddress> liveEndpoints = p.left;
             int requiredParticipants = p.right;
 
-            UUID ballot = beginAndRepairPaxos(start, key, metadata, 
liveEndpoints, requiredParticipants);
+            UUID ballot = beginAndRepairPaxos(start, key, metadata, 
liveEndpoints, requiredParticipants, consistencyForPaxos);
 
             // read the current value and compare with expected
             Tracing.trace("Reading existing values for CAS precondition");
@@ -250,10 +260,10 @@ public class StorageProxy implements StorageProxyMBean
             Tracing.trace("CAS precondition is met; proposing client-requested 
updates for {}", ballot);
             if (proposePaxos(proposal, liveEndpoints, requiredParticipants))
             {
-                if (consistencyLevel == ConsistencyLevel.SERIAL)
+                if (consistencyForCommit == ConsistencyLevel.ANY)
                     sendCommit(proposal, liveEndpoints);
                 else
-                    commitPaxos(proposal, consistencyLevel);
+                    commitPaxos(proposal, consistencyForCommit);
                 Tracing.trace("CAS successful");
                 return null;
             }
@@ -263,7 +273,7 @@ public class StorageProxy implements StorageProxyMBean
             // continue to retry
         }
 
-        throw new WriteTimeoutException(WriteType.CAS, 
ConsistencyLevel.SERIAL, -1, -1);
+        throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 
-1, -1);
     }
 
     private static boolean hasLiveColumns(ColumnFamily cf, long now)
@@ -301,15 +311,35 @@ public class StorageProxy implements StorageProxyMBean
         return true;
     }
 
-    private static Pair<List<InetAddress>, Integer> 
getPaxosParticipants(String keyspaceName, ByteBuffer key) throws 
UnavailableException
+    private static Predicate<InetAddress> sameDCPredicateFor(final String dc)
+    {
+        final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+        return new Predicate<InetAddress>()
+        {
+            public boolean apply(InetAddress host)
+            {
+                return dc.equals(snitch.getDatacenter(host));
+            }
+        };
+    }
+
+    private static Pair<List<InetAddress>, Integer> 
getPaxosParticipants(String keyspaceName, ByteBuffer key, ConsistencyLevel 
consistencyForPaxos) throws UnavailableException
     {
         Token tk = StorageService.getPartitioner().getToken(key);
         List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
         Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
+        if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
+        {
+            // Restrict naturalEndpoints and pendingEndpoints to node in the 
local DC only
+            String localDc = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+            Predicate<InetAddress> isLocalDc = sameDCPredicateFor(localDc);
+            naturalEndpoints = 
ImmutableList.copyOf(Iterables.filter(naturalEndpoints, isLocalDc));
+            pendingEndpoints = 
ImmutableList.copyOf(Iterables.filter(pendingEndpoints, isLocalDc));
+        }
         int requiredParticipants = pendingEndpoints.size() + 1 + 
naturalEndpoints.size() / 2; // See CASSANDRA-833
         List<InetAddress> liveEndpoints = 
ImmutableList.copyOf(Iterables.filter(Iterables.concat(naturalEndpoints, 
pendingEndpoints), IAsyncCallback.isAlive));
         if (liveEndpoints.size() < requiredParticipants)
-            throw new UnavailableException(ConsistencyLevel.SERIAL, 
requiredParticipants, liveEndpoints.size());
+            throw new UnavailableException(consistencyForPaxos, 
requiredParticipants, liveEndpoints.size());
         return Pair.create(liveEndpoints, requiredParticipants);
     }
 
@@ -319,7 +349,7 @@ public class StorageProxy implements StorageProxyMBean
      * @return the Paxos ballot promised by the replicas if no in-progress 
requests were seen and a quorum of
      * nodes have seen the mostRecentCommit.  Otherwise, return null.
      */
-    private static UUID beginAndRepairPaxos(long start, ByteBuffer key, 
CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants)
+    private static UUID beginAndRepairPaxos(long start, ByteBuffer key, 
CFMetaData metadata, List<InetAddress> liveEndpoints, int requiredParticipants, 
ConsistencyLevel consistencyForPaxos)
     throws WriteTimeoutException
     {
         long timeout = 
TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
@@ -376,7 +406,7 @@ public class StorageProxy implements StorageProxyMBean
             return ballot;
         }
 
-        throw new WriteTimeoutException(WriteType.CAS, 
ConsistencyLevel.SERIAL, -1, -1);
+        throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 
-1, -1);
     }
 
     /**
@@ -1079,26 +1109,26 @@ public class StorageProxy implements StorageProxyMBean
         List<Row> rows = null;
         try
         {
-            if (consistency_level == ConsistencyLevel.SERIAL)
+            if (consistency_level.isSerialConsistency())
             {
                 // make sure any in-progress paxos writes are done (i.e., 
committed to a majority of replicas), before performing a quorum read
                 if (commands.size() > 1)
-                    throw new InvalidRequestException("SERIAL consistency may 
only be requested for one row at a time");
+                    throw new InvalidRequestException("SERIAL/LOCAL_SERIAL 
consistency may only be requested for one row at a time");
                 ReadCommand command = commands.get(0);
 
                 CFMetaData metadata = 
Schema.instance.getCFMetaData(command.ksName, command.cfName);
-                Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(command.ksName, command.key);
+                Pair<List<InetAddress>, Integer> p = 
getPaxosParticipants(command.ksName, command.key, consistency_level);
                 List<InetAddress> liveEndpoints = p.left;
                 int requiredParticipants = p.right;
 
                 // does the work of applying in-progress writes; throws UAE or 
timeout if it can't
                 try
                 {
-                    beginAndRepairPaxos(start, command.key, metadata, 
liveEndpoints, requiredParticipants);
+                    beginAndRepairPaxos(start, command.key, metadata, 
liveEndpoints, requiredParticipants, consistency_level);
                 }
                 catch (WriteTimeoutException e)
                 {
-                    throw new ReadTimeoutException(ConsistencyLevel.SERIAL, 
-1, -1, false);
+                    throw new ReadTimeoutException(consistency_level, -1, -1, 
false);
                 }
 
                 rows = fetchRows(commands, ConsistencyLevel.QUORUM);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java 
b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 15ab9d0..e9473d9 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.cql.CQLStatement;
 import org.apache.cassandra.cql.QueryProcessor;
+import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.IDiskAtomFilter;
@@ -691,7 +692,12 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
-    public CASResult cas(ByteBuffer key, String column_family, List<Column> 
expected, List<Column> updates, ConsistencyLevel consistency_level)
+    public CASResult cas(ByteBuffer key,
+                         String column_family,
+                         List<Column> expected,
+                         List<Column> updates,
+                         ConsistencyLevel serial_consistency_level,
+                         ConsistencyLevel commit_consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
         if (startSessionIfRequested())
@@ -747,7 +753,14 @@ public class CassandraServer implements Cassandra.Iface
             }
 
             schedule(DatabaseDescriptor.getWriteRpcTimeout());
-            ColumnFamily result = StorageProxy.cas(cState.getKeyspace(), 
column_family, key, null, cfExpected, cfUpdates, 
ThriftConversion.fromThrift(consistency_level));
+            ColumnFamily result = StorageProxy.cas(cState.getKeyspace(),
+                                                   column_family,
+                                                   key,
+                                                   null,
+                                                   cfExpected,
+                                                   cfUpdates,
+                                                   
ThriftConversion.fromThrift(serial_consistency_level),
+                                                   
ThriftConversion.fromThrift(commit_consistency_level));
             return result == null
                  ? new CASResult(true)
                  : new 
CASResult(false).setCurrent_values(thriftifyColumnsAsColumns(result.getSortedColumns(),
 System.currentTimeMillis()));
@@ -2024,11 +2037,8 @@ public class CassandraServer implements Cassandra.Iface
             logger.trace("Retrieved prepared statement #{} with {} bind 
markers", itemId, statement.getBoundsTerms());
 
             return 
org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement,
-                                                                            
ThriftConversion.fromThrift(cLevel),
                                                                             
cState.getQueryState(),
-                                                                            
bindVariables,
-                                                                            -1,
-                                                                            
null).toThriftResult();
+                                                                            
new QueryOptions(ThriftConversion.fromThrift(cLevel), 
bindVariables)).toThriftResult();
         }
         catch (RequestExecutionException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java 
b/src/java/org/apache/cassandra/transport/Client.java
index 12c46dd..f0b700c 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -32,11 +32,13 @@ import java.util.Map;
 import com.google.common.base.Splitter;
 
 import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.transport.messages.*;
 import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.MD5Digest;
 
 import static 
org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions;
 
@@ -126,7 +128,7 @@ public class Client extends SimpleClient
                     return null;
                 }
             }
-            return new QueryMessage(query, ConsistencyLevel.ONE, 
Collections.<ByteBuffer>emptyList(), pageSize);
+            return new QueryMessage(query, new 
QueryOptions(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList(), false, 
pageSize, null, null));
         }
         else if (msgType.equals("PREPARE"))
         {
@@ -154,7 +156,7 @@ public class Client extends SimpleClient
                     }
                     values.add(bb);
                 }
-                return new ExecuteMessage(id, values, ConsistencyLevel.ONE, 
-1);
+                return new ExecuteMessage(MD5Digest.wrap(id), new 
QueryOptions(ConsistencyLevel.ONE, values));
             }
             catch (Exception e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java 
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index df4f811..cfe1bab 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -32,6 +32,7 @@ import javax.net.ssl.SSLEngine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.transport.messages.CredentialsMessage;
@@ -41,6 +42,7 @@ import org.apache.cassandra.transport.messages.PrepareMessage;
 import org.apache.cassandra.transport.messages.QueryMessage;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.transport.messages.StartupMessage;
+import org.apache.cassandra.utils.MD5Digest;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
@@ -158,7 +160,7 @@ public class SimpleClient
 
     public ResultMessage execute(String query, List<ByteBuffer> values, 
ConsistencyLevel consistencyLevel)
     {
-        Message.Response msg = execute(new QueryMessage(query, 
consistencyLevel, values, -1));
+        Message.Response msg = execute(new QueryMessage(query, new 
QueryOptions(consistencyLevel, values)));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }
@@ -172,7 +174,7 @@ public class SimpleClient
 
     public ResultMessage executePrepared(byte[] statementId, List<ByteBuffer> 
values, ConsistencyLevel consistency)
     {
-        Message.Response msg = execute(new ExecuteMessage(statementId, values, 
consistency, -1));
+        Message.Response msg = execute(new 
ExecuteMessage(MD5Digest.wrap(statementId), new QueryOptions(consistency, 
values)));
         assert msg instanceof ResultMessage;
         return (ResultMessage)msg;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index f83df9d..c297426 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -26,9 +26,11 @@ import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 import org.apache.cassandra.service.QueryState;
@@ -40,131 +42,65 @@ import org.apache.cassandra.utils.UUIDGen;
 
 public class ExecuteMessage extends Message.Request
 {
-    public static enum Flag
-    {
-        // The order of that enum matters!!
-        PAGE_SIZE,
-        SKIP_METADATA,
-        PAGING_STATE;
-
-        public static EnumSet<Flag> deserialize(int flags)
-        {
-            EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
-            Flag[] values = Flag.values();
-            for (int n = 0; n < values.length; n++)
-            {
-                if ((flags & (1 << n)) != 0)
-                    set.add(values[n]);
-            }
-            return set;
-        }
-
-        public static int serialize(EnumSet<Flag> flags)
-        {
-            int i = 0;
-            for (Flag flag : flags)
-                i |= 1 << flag.ordinal();
-            return i;
-        }
-    }
-
     public static final Message.Codec<ExecuteMessage> codec = new 
Message.Codec<ExecuteMessage>()
     {
         public ExecuteMessage decode(ChannelBuffer body, int version)
         {
             byte[] id = CBUtil.readBytes(body);
 
-            int count = body.readUnsignedShort();
-            List<ByteBuffer> values = new ArrayList<ByteBuffer>(count);
-            for (int i = 0; i < count; i++)
-                values.add(CBUtil.readValue(body));
-
-            ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
+            if (version == 1)
+            {
+                int count = body.readUnsignedShort();
+                List<ByteBuffer> values = new ArrayList<ByteBuffer>(count);
+                for (int i = 0; i < count; i++)
+                    values.add(CBUtil.readValue(body));
 
-            int resultPageSize = -1;
-            boolean skipMetadata = false;
-            PagingState pagingState = null;
-            if (version >= 2)
+                ConsistencyLevel consistency = 
CBUtil.readConsistencyLevel(body);
+                return new ExecuteMessage(id, values, consistency);
+            }
+            else
             {
-                EnumSet<Flag> flags = Flag.deserialize((int)body.readByte());
-                if (flags.contains(Flag.PAGE_SIZE))
-                    resultPageSize = body.readInt();
-                skipMetadata = flags.contains(Flag.SKIP_METADATA);
-                if (flags.contains(Flag.PAGING_STATE))
-                    pagingState = 
PagingState.deserialize(CBUtil.readValue(body));
+                return new ExecuteMessage(MD5Digest.wrap(id), 
QueryOptions.codec.decode(body, version));
             }
-            return new ExecuteMessage(MD5Digest.wrap(id), values, consistency, 
resultPageSize, skipMetadata, pagingState);
         }
 
         public ChannelBuffer encode(ExecuteMessage msg, int version)
         {
-            // We have:
-            //   - statementId
-            //   - Number of values
-            //   - The values
-            //   - options
-            int vs = msg.values.size();
-
-            EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
-            if (msg.resultPageSize >= 0)
-                flags.add(Flag.PAGE_SIZE);
-            if (msg.skipMetadata)
-                flags.add(Flag.SKIP_METADATA);
-            if (msg.pagingState != null)
-                flags.add(Flag.PAGING_STATE);
-
-            assert flags.isEmpty() || version >= 2;
-
-            int nbBuff = 3;
-            if (version >= 2)
+            ChannelBuffer idBuffer = CBUtil.bytesToCB(msg.statementId.bytes);
+            ChannelBuffer optBuffer;
+            if (version == 1)
             {
-                nbBuff++; // the flags themselves
-                if (flags.contains(Flag.PAGE_SIZE))
-                    nbBuff++;
-            }
-            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(nbBuff, 0, 
vs + (flags.contains(Flag.PAGING_STATE) ? 1 : 0));
-            builder.add(CBUtil.bytesToCB(msg.statementId.bytes));
-            builder.add(CBUtil.shortToCB(vs));
+                CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(2, 0, 
msg.options.getValues().size());
+                builder.add(CBUtil.shortToCB(msg.options.getValues().size()));
 
-            // Values
-            for (ByteBuffer value : msg.values)
-                builder.addValue(value);
+                // Values
+                for (ByteBuffer value : msg.options.getValues())
+                    builder.addValue(value);
 
-            builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
-
-            if (version >= 2)
+                
builder.add(CBUtil.consistencyLevelToCB(msg.options.getConsistency()));
+                optBuffer = builder.build();
+            }
+            else
             {
-                builder.add(CBUtil.byteToCB((byte)Flag.serialize(flags)));
-                if (flags.contains(Flag.PAGE_SIZE))
-                    builder.add(CBUtil.intToCB(msg.resultPageSize));
-                if (flags.contains(Flag.PAGING_STATE))
-                    builder.addValue(msg.pagingState == null ? null : 
msg.pagingState.serialize());
+                optBuffer = QueryOptions.codec.encode(msg.options, version);
             }
-            return builder.build();
+            return ChannelBuffers.wrappedBuffer(idBuffer, optBuffer);
         }
     };
 
     public final MD5Digest statementId;
-    public final List<ByteBuffer> values;
-    public final ConsistencyLevel consistency;
-    public final int resultPageSize;
-    public final boolean skipMetadata;
-    public final PagingState pagingState;
+    public final QueryOptions options;
 
-    public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, 
ConsistencyLevel consistency, int resultPageSize)
+    public ExecuteMessage(byte[] statementId, List<ByteBuffer> values, 
ConsistencyLevel consistency)
     {
-        this(MD5Digest.wrap(statementId), values, consistency, resultPageSize, 
false, null);
+        this(MD5Digest.wrap(statementId), new QueryOptions(consistency, 
values));
     }
 
-    public ExecuteMessage(MD5Digest statementId, List<ByteBuffer> values, 
ConsistencyLevel consistency, int resultPageSize, boolean skipMetadata, 
PagingState pagingState)
+    public ExecuteMessage(MD5Digest statementId, QueryOptions options)
     {
         super(Message.Type.EXECUTE);
         this.statementId = statementId;
-        this.values = values;
-        this.consistency = consistency;
-        this.resultPageSize = resultPageSize;
-        this.skipMetadata = skipMetadata;
-        this.pagingState = pagingState;
+        this.options = options;
     }
 
     public ChannelBuffer encode(int version)
@@ -181,7 +117,7 @@ public class ExecuteMessage extends Message.Request
             if (statement == null)
                 throw new PreparedQueryNotFoundException(statementId);
 
-            if (resultPageSize == 0)
+            if (options.getPageSize() == 0)
                 throw new ProtocolException("The page size cannot be 0");
 
             UUID tracingId = null;
@@ -196,15 +132,15 @@ public class ExecuteMessage extends Message.Request
                 state.createTracingSession();
 
                 ImmutableMap.Builder<String, String> builder = 
ImmutableMap.builder();
-                if (resultPageSize > 0)
-                    builder.put("page_size", Integer.toString(resultPageSize));
+                if (options.getPageSize() > 0)
+                    builder.put("page_size", 
Integer.toString(options.getPageSize()));
 
                 // TODO we don't have [typed] access to CQL bind variables 
here.  CASSANDRA-4560 is open to add support.
                 Tracing.instance.begin("Execute CQL3 prepared query", 
builder.build());
             }
 
-            Message.Response response = 
QueryProcessor.processPrepared(statement, consistency, state, values, 
resultPageSize, pagingState);
-            if (skipMetadata && response instanceof ResultMessage.Rows)
+            Message.Response response = 
QueryProcessor.processPrepared(statement, state, options);
+            if (options.skipMetadata() && response instanceof 
ResultMessage.Rows)
                 
((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
 
             if (tracingId != null)
@@ -225,6 +161,6 @@ public class ExecuteMessage extends Message.Request
     @Override
     public String toString()
     {
-        return "EXECUTE " + statementId + " with " + values.size() + " values 
at consistency " + consistency;
+        return "EXECUTE " + statementId + " with " + 
options.getValues().size() + " values at consistency " + 
options.getConsistency();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd73bea/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java 
b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 9e8050c..2b2583e 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -26,8 +26,10 @@ import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
 import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
 
 import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.QueryState;
@@ -41,147 +43,43 @@ import org.apache.cassandra.utils.UUIDGen;
  */
 public class QueryMessage extends Message.Request
 {
-    public static enum Flag
-    {
-        // The order of that enum matters!!
-        PAGE_SIZE,
-        VALUES,
-        SKIP_METADATA,
-        PAGING_STATE;
-
-        public static EnumSet<Flag> deserialize(int flags)
-        {
-            EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
-            Flag[] values = Flag.values();
-            for (int n = 0; n < values.length; n++)
-            {
-                if ((flags & (1 << n)) != 0)
-                    set.add(values[n]);
-            }
-            return set;
-        }
-
-        public static int serialize(EnumSet<Flag> flags)
-        {
-            int i = 0;
-            for (Flag flag : flags)
-                i |= 1 << flag.ordinal();
-            return i;
-        }
-    }
-
     public static final Message.Codec<QueryMessage> codec = new 
Message.Codec<QueryMessage>()
     {
         public QueryMessage decode(ChannelBuffer body, int version)
         {
             String query = CBUtil.readLongString(body);
-            ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
-
-            int resultPageSize = -1;
-            List<ByteBuffer> values = Collections.emptyList();
-            boolean skipMetadata = false;
-            PagingState pagingState = null;
-            if (version >= 2)
+            if (version == 1)
             {
-                EnumSet<Flag> flags = Flag.deserialize((int)body.readByte());
-
-                if (flags.contains(Flag.PAGE_SIZE))
-                    resultPageSize = body.readInt();
-
-                if (flags.contains(Flag.VALUES))
-                {
-                    int paramCount = body.readUnsignedShort();
-                    values = paramCount == 0 ? 
Collections.<ByteBuffer>emptyList() : new ArrayList<ByteBuffer>(paramCount);
-                    for (int i = 0; i < paramCount; i++)
-                        values.add(CBUtil.readValue(body));
-                }
-
-                skipMetadata = flags.contains(Flag.SKIP_METADATA);
-
-                if (flags.contains(Flag.PAGING_STATE))
-                    pagingState = 
PagingState.deserialize(CBUtil.readValue(body));
+                ConsistencyLevel consistency = 
CBUtil.readConsistencyLevel(body);
+                return new QueryMessage(query, consistency);
+            }
+            else
+            {
+                return new QueryMessage(query, QueryOptions.codec.decode(body, 
version));
             }
-            return new QueryMessage(query, consistency, values, 
resultPageSize, skipMetadata, pagingState);
         }
 
         public ChannelBuffer encode(QueryMessage msg, int version)
         {
-            // We have:
-            //   - query
-            //   - options
-            //     * optional:
-            //   - Number of values
-            //   - The values
-            int vs = msg.values.size();
-
-            EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
-            if (msg.resultPageSize >= 0)
-                flags.add(Flag.PAGE_SIZE);
-            if (vs > 0)
-                flags.add(Flag.VALUES);
-            if (msg.skipMetadata)
-                flags.add(Flag.SKIP_METADATA);
-            if (msg.pagingState != null)
-                flags.add(Flag.PAGING_STATE);
-
-            assert flags.isEmpty() || version >= 2 : "Version 1 of the 
protocol supports no option after the consistency level";
-
-            int nbBuff = 2;
-            if (version >= 2)
-            {
-                nbBuff++; // the flags themselves
-                if (flags.contains(Flag.PAGE_SIZE))
-                    nbBuff++;
-                if (flags.contains(Flag.VALUES))
-                    nbBuff++;
-            }
-            CBUtil.BufferBuilder builder = new CBUtil.BufferBuilder(nbBuff, 0, 
vs + (flags.contains(Flag.PAGING_STATE) ? 1 : 0));
-            builder.add(CBUtil.longStringToCB(msg.query));
-            builder.add(CBUtil.consistencyLevelToCB(msg.consistency));
-            if (version >= 2)
-            {
-                builder.add(CBUtil.byteToCB((byte)Flag.serialize(flags)));
-                if (flags.contains(Flag.PAGE_SIZE))
-                    builder.add(CBUtil.intToCB(msg.resultPageSize));
-                if (flags.contains(Flag.VALUES))
-                {
-                    builder.add(CBUtil.shortToCB(vs));
-                    for (ByteBuffer value : msg.values)
-                        builder.addValue(value);
-                }
-                if (flags.contains(Flag.PAGING_STATE))
-                    builder.addValue(msg.pagingState == null ? null : 
msg.pagingState.serialize());
-            }
-            return builder.build();
+            return 
ChannelBuffers.wrappedBuffer(CBUtil.longStringToCB(msg.query),
+                                                (version == 1 ? 
CBUtil.consistencyLevelToCB(msg.options.getConsistency())
+                                                              : 
QueryOptions.codec.encode(msg.options, version)));
         }
     };
 
     public final String query;
-    public final ConsistencyLevel consistency;
-    public final int resultPageSize;
-    public final List<ByteBuffer> values;
-    public final boolean skipMetadata;
-    public final PagingState pagingState;
+    public final QueryOptions options;
 
     public QueryMessage(String query, ConsistencyLevel consistency)
     {
-        this(query, consistency, Collections.<ByteBuffer>emptyList(), -1);
-    }
-
-    public QueryMessage(String query, ConsistencyLevel consistency, 
List<ByteBuffer> values, int resultPageSize)
-    {
-        this(query, consistency, values, resultPageSize, false, null);
+        this(query, new QueryOptions(consistency, 
Collections.<ByteBuffer>emptyList()));
     }
 
-    public QueryMessage(String query, ConsistencyLevel consistency, 
List<ByteBuffer> values, int resultPageSize, boolean skipMetadata, PagingState 
pagingState)
+    public QueryMessage(String query, QueryOptions options)
     {
         super(Type.QUERY);
         this.query = query;
-        this.consistency = consistency;
-        this.resultPageSize = resultPageSize;
-        this.values = values;
-        this.skipMetadata = skipMetadata;
-        this.pagingState = pagingState;
+        this.options = options;
     }
 
     public ChannelBuffer encode(int version)
@@ -193,7 +91,7 @@ public class QueryMessage extends Message.Request
     {
         try
         {
-            if (resultPageSize == 0)
+            if (options.getPageSize() == 0)
                 throw new ProtocolException("The page size cannot be 0");
 
             UUID tracingId = null;
@@ -209,14 +107,14 @@ public class QueryMessage extends Message.Request
 
                 ImmutableMap.Builder<String, String> builder = 
ImmutableMap.builder();
                 builder.put("query", query);
-                if (resultPageSize > 0)
-                    builder.put("page_size", Integer.toString(resultPageSize));
+                if (options.getPageSize() > 0)
+                    builder.put("page_size", 
Integer.toString(options.getPageSize()));
 
                 Tracing.instance.begin("Execute CQL3 query", builder.build());
             }
 
-            Message.Response response = QueryProcessor.process(query, values, 
consistency, state, resultPageSize, pagingState);
-            if (skipMetadata && response instanceof ResultMessage.Rows)
+            Message.Response response = QueryProcessor.process(query, state, 
options);
+            if (options.skipMetadata() && response instanceof 
ResultMessage.Rows)
                 
((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
 
             if (tracingId != null)

Reply via email to