Updated Branches:
  refs/heads/trunk bc3597d35 -> 524261f88

Allow preparing timestamp, ttl and limit in queries

patch by slebresne; reviewed by iamaleksey for CASSANDRA-4450


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/524261f8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/524261f8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/524261f8

Branch: refs/heads/trunk
Commit: 524261f88cd2adcd623de3604e735b282dd5caac
Parents: bc3597d
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Mon Apr 29 09:27:36 2013 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Tue May 28 11:09:16 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/cql3/Attributes.java |  110 ++++++++++++++-
 src/java/org/apache/cassandra/cql3/Cql.g           |   35 +++--
 .../cassandra/cql3/statements/BatchStatement.java  |   22 ++--
 .../cassandra/cql3/statements/DeleteStatement.java |    4 +-
 .../cql3/statements/ModificationStatement.java     |   28 ++--
 .../cassandra/cql3/statements/SelectStatement.java |  103 +++++++++-----
 .../cassandra/cql3/statements/UpdateStatement.java |    8 +-
 .../cassandra/transport/messages/BatchMessage.java |    2 +-
 9 files changed, 225 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e233ba0..e630d23 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -53,6 +53,7 @@
  * Track max/min column names in sstables to be able to optimize slice
    queries (CASSANDRA-5514)
  * Binary protocol: allow batching already prepared statements (CASSANDRA-4693)
+ * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450)
 
 1.2.6
  * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java 
b/src/java/org/apache/cassandra/cql3/Attributes.java
index 62f98b2..511f34e 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -17,7 +17,13 @@
  */
 package org.apache.cassandra.cql3;
 
+import java.nio.ByteBuffer;
+import java.util.List;
+
 import org.apache.cassandra.db.ExpiringColumn;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
 /**
@@ -26,15 +32,107 @@ import 
org.apache.cassandra.exceptions.InvalidRequestException;
  */
 public class Attributes
 {
-    public Long timestamp;
-    public int timeToLive;
+    private final Term timestamp;
+    private final Term timeToLive;
+
+    public static Attributes none()
+    {
+        return new Attributes(null, null);
+    }
+
+    private Attributes(Term timestamp, Term timeToLive)
+    {
+        this.timestamp = timestamp;
+        this.timeToLive = timeToLive;
+    }
+
+    public boolean isTimestampSet()
+    {
+        return timestamp != null;
+    }
+
+    public boolean isTimeToLiveSet()
+    {
+        return timeToLive != null;
+    }
+
+    public long getTimestamp(long now, List<ByteBuffer> variables) throws 
InvalidRequestException
+    {
+        if (timestamp == null)
+            return now;
+
+        ByteBuffer tval = timestamp.bindAndGet(variables);
+        if (tval == null)
+            throw new InvalidRequestException("Invalid null value of 
timestamp");
 
-    public void validate() throws InvalidRequestException
+        try
+        {
+            LongType.instance.validate(tval);
+        }
+        catch (MarshalException e)
+        {
+            throw new InvalidRequestException("Invalid timestamp value");
+        }
+
+        return LongType.instance.compose(tval);
+    }
+
+    public int getTimeToLive(List<ByteBuffer> variables) throws 
InvalidRequestException
     {
-        if (timeToLive < 0)
+        if (timeToLive == null)
+            return 0;
+
+        ByteBuffer tval = timeToLive.bindAndGet(variables);
+        if (tval == null)
+            throw new InvalidRequestException("Invalid null value of TTL");
+
+        try
+        {
+            Int32Type.instance.validate(tval);
+        }
+        catch (MarshalException e)
+        {
+            throw new InvalidRequestException("Invalid timestamp value");
+        }
+
+        int ttl = Int32Type.instance.compose(tval);
+        if (ttl < 0)
             throw new InvalidRequestException("A TTL must be greater or equal 
to 0");
 
-        if (timeToLive > ExpiringColumn.MAX_TTL)
-            throw new InvalidRequestException(String.format("ttl is too large. 
requested (%d) maximum (%d)", timeToLive, ExpiringColumn.MAX_TTL));
+        if (ttl > ExpiringColumn.MAX_TTL)
+            throw new InvalidRequestException(String.format("ttl is too large. 
requested (%d) maximum (%d)", ttl, ExpiringColumn.MAX_TTL));
+
+        return ttl;
+    }
+
+    public void collectMarkerSpecification(ColumnSpecification[] boundNames)
+    {
+        if (timestamp != null)
+            timestamp.collectMarkerSpecification(boundNames);
+        if (timeToLive != null)
+            timeToLive.collectMarkerSpecification(boundNames);
+    }
+
+    public static class Raw
+    {
+        public Term.Raw timestamp;
+        public Term.Raw timeToLive;
+
+        public Attributes prepare(String ksName, String cfName) throws 
InvalidRequestException
+        {
+            Term ts = timestamp == null ? null : 
timestamp.prepare(timestampReceiver(ksName, cfName));
+            Term ttl = timeToLive == null ? null : 
timeToLive.prepare(timeToLiveReceiver(ksName, cfName));
+            return new Attributes(ts, ttl);
+        }
+
+        private ColumnSpecification timestampReceiver(String ksName, String 
cfName)
+        {
+            return new ColumnSpecification(ksName, cfName, new 
ColumnIdentifier("[timestamp]", true), LongType.instance);
+        }
+
+        private ColumnSpecification timeToLiveReceiver(String ksName, String 
cfName)
+        {
+            return new ColumnSpecification(ksName, cfName, new 
ColumnIdentifier("[ttl]", true), Int32Type.instance);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/Cql.g
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Cql.g 
b/src/java/org/apache/cassandra/cql3/Cql.g
index 8d31de9..913f6ea 100644
--- a/src/java/org/apache/cassandra/cql3/Cql.g
+++ b/src/java/org/apache/cassandra/cql3/Cql.g
@@ -212,7 +212,7 @@ selectStatement returns [SelectStatement.RawStatement expr]
     @init {
         boolean isCount = false;
         ColumnIdentifier countAlias = null;
-        int limit = Integer.MAX_VALUE;
+        Term.Raw limit = null;
         Map<ColumnIdentifier, Boolean> orderings = new 
LinkedHashMap<ColumnIdentifier, Boolean>();
         boolean allowFiltering = false;
     }
@@ -221,15 +221,14 @@ selectStatement returns [SelectStatement.RawStatement 
expr]
       K_FROM cf=columnFamilyName
       ( K_WHERE wclause=whereClause )?
       ( K_ORDER K_BY orderByClause[orderings] ( ',' orderByClause[orderings] 
)* )?
-      ( K_LIMIT rows=INTEGER { limit = Integer.parseInt($rows.text); } )?
+      ( K_LIMIT rows=intValue { limit = rows; } )?
       ( K_ALLOW K_FILTERING  { allowFiltering = true; } )?
       {
-          SelectStatement.Parameters params = new 
SelectStatement.Parameters(limit,
-                                                                             
orderings,
+          SelectStatement.Parameters params = new 
SelectStatement.Parameters(orderings,
                                                                              
isCount,
                                                                              
countAlias,
                                                                              
allowFiltering);
-          $expr = new SelectStatement.RawStatement(cf, params, sclause, 
wclause);
+          $expr = new SelectStatement.RawStatement(cf, params, sclause, 
wclause, limit);
       }
     ;
 
@@ -283,7 +282,7 @@ orderByClause[Map<ColumnIdentifier, Boolean> orderings]
  */
 insertStatement returns [UpdateStatement.ParsedInsert expr]
     @init {
-        Attributes attrs = new Attributes();
+        Attributes.Raw attrs = new Attributes.Raw();
         List<ColumnIdentifier> columnNames  = new 
ArrayList<ColumnIdentifier>();
         List<Term.Raw> values = new ArrayList<Term.Raw>();
     }
@@ -297,21 +296,21 @@ insertStatement returns [UpdateStatement.ParsedInsert 
expr]
       }
     ;
 
-usingClause[Attributes attrs]
+usingClause[Attributes.Raw attrs]
     : K_USING usingClauseObjective[attrs] ( K_AND? usingClauseObjective[attrs] 
)*
     ;
 
-usingClauseDelete[Attributes attrs]
+usingClauseDelete[Attributes.Raw attrs]
     : K_USING usingClauseDeleteObjective[attrs] ( K_AND? 
usingClauseDeleteObjective[attrs] )*
     ;
 
-usingClauseDeleteObjective[Attributes attrs]
-    : K_TIMESTAMP ts=INTEGER { attrs.timestamp = Long.valueOf($ts.text); }
+usingClauseDeleteObjective[Attributes.Raw attrs]
+    : K_TIMESTAMP ts=intValue { attrs.timestamp = ts; }
     ;
 
-usingClauseObjective[Attributes attrs]
+usingClauseObjective[Attributes.Raw attrs]
     : usingClauseDeleteObjective[attrs]
-    | K_TTL t=INTEGER { attrs.timeToLive = Integer.valueOf($t.text); }
+    | K_TTL t=intValue { attrs.timeToLive = t; }
     ;
 
 /**
@@ -322,7 +321,7 @@ usingClauseObjective[Attributes attrs]
  */
 updateStatement returns [UpdateStatement.ParsedUpdate expr]
     @init {
-        Attributes attrs = new Attributes();
+        Attributes.Raw attrs = new Attributes.Raw();
         List<Pair<ColumnIdentifier, Operation.RawUpdate>> operations = new 
ArrayList<Pair<ColumnIdentifier, Operation.RawUpdate>>();
         boolean ifNotExists = false;
     }
@@ -354,7 +353,7 @@ updateCondition returns [List<Pair<ColumnIdentifier, 
Operation.RawUpdate>> condi
  */
 deleteStatement returns [DeleteStatement.Parsed expr]
     @init {
-        Attributes attrs = new Attributes();
+        Attributes.Raw attrs = new Attributes.Raw();
         List<Operation.RawDeletion> columnDeletions = Collections.emptyList();
     }
     : K_DELETE ( dels=deleteSelection { columnDeletions = dels; } )?
@@ -410,7 +409,7 @@ batchStatement returns [BatchStatement.Parsed expr]
     @init {
         BatchStatement.Type type = BatchStatement.Type.LOGGED;
         List<ModificationStatement.Parsed> statements = new 
ArrayList<ModificationStatement.Parsed>();
-        Attributes attrs = new Attributes();
+        Attributes.Raw attrs = new Attributes.Raw();
     }
     : K_BEGIN
       ( K_UNLOGGED { type = BatchStatement.Type.UNLOGGED; } | K_COUNTER { type 
= BatchStatement.Type.COUNTER; } )?
@@ -738,6 +737,12 @@ value returns [Term.Raw value]
     | QMARK                { $value = new 
AbstractMarker.Raw(++currentBindMarkerIdx); }
     ;
 
+intValue returns [Term.Raw value]
+    :
+    | t=INTEGER { $value = Constants.Literal.integer($t.text); }
+    | QMARK     { $value = new AbstractMarker.Raw(++currentBindMarkerIdx); }
+    ;
+
 functionName returns [String s]
     : f=IDENT                       { $s = $f.text; }
     | u=unreserved_function_keyword { $s = u; }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index d6d0e16..777c80f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -76,23 +76,16 @@ public class BatchStatement implements CQLStatement
 
     public void validate(ClientState state) throws InvalidRequestException
     {
-        if (attrs.timeToLive != 0)
+        if (attrs.isTimeToLiveSet())
             throw new InvalidRequestException("Global TTL on the BATCH 
statement is not supported.");
 
         for (ModificationStatement statement : statements)
         {
-            statement.validate(state);
-
-            if (attrs.timestamp != null && statement.isSetTimestamp())
+            if (attrs.isTimestampSet() && statement.isTimestampSet())
                 throw new InvalidRequestException("Timestamp must be set 
either on BATCH or individual statements");
         }
     }
 
-    public long getTimestamp(long now)
-    {
-        return attrs.timestamp == null ? now : attrs.timestamp;
-    }
-
     private Collection<? extends IMutation> getMutations(List<ByteBuffer> 
variables, boolean local, ConsistencyLevel cl, long now)
     throws RequestExecutionException, RequestValidationException
     {
@@ -125,7 +118,7 @@ public class BatchStatement implements CQLStatement
     throws RequestExecutionException, RequestValidationException
     {
         // Group mutation together, otherwise they won't get applied atomically
-        for (IMutation m : statement.getMutations(variables, local, cl, 
getTimestamp(now), true))
+        for (IMutation m : statement.getMutations(variables, local, cl, 
attrs.getTimestamp(now, variables), true))
         {
             Pair<String, ByteBuffer> key = Pair.create(m.getTable(), m.key());
             IMutation existing = mutations.get(key);
@@ -179,10 +172,10 @@ public class BatchStatement implements CQLStatement
     public static class Parsed extends CFStatement
     {
         private final Type type;
-        private final Attributes attrs;
+        private final Attributes.Raw attrs;
         private final List<ModificationStatement.Parsed> parsedStatements;
 
-        public Parsed(Type type, Attributes attrs, 
List<ModificationStatement.Parsed> parsedStatements)
+        public Parsed(Type type, Attributes.Raw attrs, 
List<ModificationStatement.Parsed> parsedStatements)
         {
             super(null);
             this.type = type;
@@ -217,7 +210,10 @@ public class BatchStatement implements CQLStatement
                 statements.add(stmt);
             }
 
-            return new ParsedStatement.Prepared(new 
BatchStatement(getBoundsTerms(), type, statements, attrs), 
Arrays.<ColumnSpecification>asList(boundNames));
+            Attributes prepAttrs = attrs.prepare("[batch]", "[batch]");
+            prepAttrs.collectMarkerSpecification(boundNames);
+
+            return new ParsedStatement.Prepared(new 
BatchStatement(getBoundsTerms(), type, statements, prepAttrs), 
Arrays.<ColumnSpecification>asList(boundNames));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index db3c41c..54a1034 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -93,7 +93,7 @@ public class DeleteStatement extends ModificationStatement
         private final List<Relation> whereClause;
 
         public Parsed(CFName name,
-                      Attributes attrs,
+                      Attributes.Raw attrs,
                       List<Operation.RawDeletion> deletions,
                       List<Relation> whereClause,
                       List<Pair<ColumnIdentifier, Operation.RawUpdate>> 
conditions)
@@ -103,7 +103,7 @@ public class DeleteStatement extends ModificationStatement
             this.whereClause = whereClause;
         }
 
-        protected ModificationStatement prepareInternal(CFDefinition cfDef, 
ColumnSpecification[] boundNames) throws InvalidRequestException
+        protected ModificationStatement prepareInternal(CFDefinition cfDef, 
ColumnSpecification[] boundNames, Attributes attrs) throws 
InvalidRequestException
         {
             DeleteStatement stmt = new DeleteStatement(getBoundsTerms(), 
cfDef.cfm, attrs);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 7766f94..f6b7140 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -85,19 +85,19 @@ public abstract class ModificationStatement implements 
CQLStatement
         return cfm.getDefaultValidator().isCommutative();
     }
 
-    public int getTimeToLive()
+    public long getTimestamp(long now, List<ByteBuffer> variables) throws 
InvalidRequestException
     {
-        return attrs.timeToLive;
+        return attrs.getTimestamp(now, variables);
     }
 
-    public long getTimestamp(long now)
+    public boolean isTimestampSet()
     {
-        return attrs.timestamp == null ? now : attrs.timestamp;
+        return attrs.isTimestampSet();
     }
 
-    public boolean isSetTimestamp()
+    public int getTimeToLive(List<ByteBuffer> variables) throws 
InvalidRequestException
     {
-        return attrs.timestamp != null;
+        return attrs.getTimeToLive(variables);
     }
 
     public void checkAccess(ClientState state) throws InvalidRequestException, 
UnauthorizedException
@@ -107,7 +107,6 @@ public abstract class ModificationStatement implements 
CQLStatement
 
     public void validate(ClientState state) throws InvalidRequestException
     {
-        attrs.validate();
     }
 
     public void addOperation(Operation op)
@@ -363,7 +362,7 @@ public abstract class ModificationStatement implements 
CQLStatement
             throw new InvalidRequestException("IN on the partition key is not 
supported with conditional updates");
 
         ColumnNameBuilder clusteringPrefix = 
createClusteringPrefixBuilder(variables);
-        UpdateParameters params = new UpdateParameters(cfm, variables, 
getTimestamp(queryState.getTimestamp()), getTimeToLive(), null);
+        UpdateParameters params = new UpdateParameters(cfm, variables, 
getTimestamp(queryState.getTimestamp(), variables), getTimeToLive(variables), 
null);
 
         ByteBuffer key = keys.get(0);
         ThriftValidation.validateKey(cfm, key);
@@ -407,7 +406,7 @@ public abstract class ModificationStatement implements 
CQLStatement
 
         // Some lists operation requires reading
         Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, 
clusteringPrefix, local, cl);
-        UpdateParameters params = new UpdateParameters(cfm, variables, 
getTimestamp(now), getTimeToLive(), rows);
+        UpdateParameters params = new UpdateParameters(cfm, variables, 
getTimestamp(now, variables), getTimeToLive(variables), rows);
 
         Collection<IMutation> mutations = new ArrayList<IMutation>();
         for (ByteBuffer key: keys)
@@ -449,11 +448,11 @@ public abstract class ModificationStatement implements 
CQLStatement
 
     public static abstract class Parsed extends CFStatement
     {
-        protected final Attributes attrs;
+        protected final Attributes.Raw attrs;
         private final List<Pair<ColumnIdentifier, Operation.RawUpdate>> 
conditions;
         private final boolean ifNotExists;
 
-        protected Parsed(CFName name, Attributes attrs, 
List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions, boolean 
ifNotExists)
+        protected Parsed(CFName name, Attributes.Raw attrs, 
List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions, boolean 
ifNotExists)
         {
             super(name);
             this.attrs = attrs;
@@ -473,7 +472,10 @@ public abstract class ModificationStatement implements 
CQLStatement
             CFMetaData metadata = 
ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
             CFDefinition cfDef = metadata.getCfDef();
 
-            ModificationStatement stmt = prepareInternal(cfDef, boundNames);
+            Attributes preparedAttributes = attrs.prepare(keyspace(), 
columnFamily());
+            preparedAttributes.collectMarkerSpecification(boundNames);
+
+            ModificationStatement stmt = prepareInternal(cfDef, boundNames, 
preparedAttributes);
 
             if (ifNotExists || (conditions != null && !conditions.isEmpty()))
             {
@@ -528,6 +530,6 @@ public abstract class ModificationStatement implements 
CQLStatement
             return stmt;
         }
 
-        protected abstract ModificationStatement prepareInternal(CFDefinition 
cfDef, ColumnSpecification[] boundNames) throws InvalidRequestException;
+        protected abstract ModificationStatement prepareInternal(CFDefinition 
cfDef, ColumnSpecification[] boundNames, Attributes attrs) throws 
InvalidRequestException;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index d45b730..9630771 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -58,6 +58,7 @@ public class SelectStatement implements CQLStatement
     public final CFDefinition cfDef;
     public final Parameters parameters;
     private final Selection selection;
+    private final Term limit;
 
     private final Restriction[] keyRestrictions;
     private final Restriction[] columnRestrictions;
@@ -92,7 +93,7 @@ public class SelectStatement implements CQLStatement
         }
     }
 
-    public SelectStatement(CFDefinition cfDef, int boundTerms, Parameters 
parameters, Selection selection)
+    public SelectStatement(CFDefinition cfDef, int boundTerms, Parameters 
parameters, Selection selection, Term limit)
     {
         this.cfDef = cfDef;
         this.boundTerms = boundTerms;
@@ -100,6 +101,7 @@ public class SelectStatement implements CQLStatement
         this.keyRestrictions = new Restriction[cfDef.keys.size()];
         this.columnRestrictions = new Restriction[cfDef.columns.size()];
         this.parameters = parameters;
+        this.limit = limit;
     }
 
     public int getBoundsTerms()
@@ -124,17 +126,18 @@ public class SelectStatement implements CQLStatement
 
         cl.validateForRead(keyspace());
 
+        int limit = getLimit(variables);
         List<Row> rows = isKeyRange || usesSecondaryIndexing
-                       ? 
StorageProxy.getRangeSlice(getRangeCommand(variables), cl)
-                       : StorageProxy.read(getSliceCommands(variables), cl);
+                       ? StorageProxy.getRangeSlice(getRangeCommand(variables, 
limit), cl)
+                       : StorageProxy.read(getSliceCommands(variables, limit), 
cl);
 
-        return processResults(rows, variables);
+        return processResults(rows, variables, limit);
     }
 
-    private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> 
variables) throws RequestValidationException
+    private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> 
variables, int limit) throws RequestValidationException
     {
         // Even for count, we need to process the result as it'll group some 
column together in sparse column families
-        ResultSet rset = process(rows, variables);
+        ResultSet rset = process(rows, variables, limit);
         rset = parameters.isCount ? 
rset.makeCountResult(parameters.countAlias) : rset;
         return new ResultMessage.Rows(rset);
     }
@@ -150,17 +153,19 @@ public class SelectStatement implements CQLStatement
 
     public ResultMessage.Rows executeInternal(QueryState state) throws 
RequestExecutionException, RequestValidationException
     {
+        List<ByteBuffer> variables = Collections.<ByteBuffer>emptyList();
+        int limit = getLimit(variables);
         List<Row> rows = isKeyRange || usesSecondaryIndexing
-                       ? 
RangeSliceVerbHandler.executeLocally(getRangeCommand(Collections.<ByteBuffer>emptyList()))
-                       : readLocally(keyspace(), 
getSliceCommands(Collections.<ByteBuffer>emptyList()));
+                       ? 
RangeSliceVerbHandler.executeLocally(getRangeCommand(variables, limit))
+                       : readLocally(keyspace(), getSliceCommands(variables, 
limit));
 
-        return processResults(rows, Collections.<ByteBuffer>emptyList());
+        return processResults(rows, variables, limit);
     }
 
     public ResultSet process(List<Row> rows) throws InvalidRequestException
     {
         assert !parameters.isCount; // not yet needed
-        return process(rows, Collections.<ByteBuffer>emptyList());
+        return process(rows, Collections.<ByteBuffer>emptyList(), 
getLimit(Collections.<ByteBuffer>emptyList()));
     }
 
     public String keyspace()
@@ -173,12 +178,12 @@ public class SelectStatement implements CQLStatement
         return cfDef.cfm.cfName;
     }
 
-    private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables) 
throws RequestValidationException
+    private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int 
limit) throws RequestValidationException
     {
         Collection<ByteBuffer> keys = getKeys(variables);
         List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
 
-        IDiskAtomFilter filter = makeFilter(variables);
+        IDiskAtomFilter filter = makeFilter(variables, limit);
         // Note that we use the total limit for every key, which is 
potentially inefficient.
         // However, IN + LIMIT is not a very sensible choice.
         for (ByteBuffer key : keys)
@@ -192,9 +197,9 @@ public class SelectStatement implements CQLStatement
         return commands;
     }
 
-    private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables) 
throws RequestValidationException
+    private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int 
limit) throws RequestValidationException
     {
-        IDiskAtomFilter filter = makeFilter(variables);
+        IDiskAtomFilter filter = makeFilter(variables, limit);
         List<IndexExpression> expressions = getIndexExpressions(variables);
         // The LIMIT provided by the user is the number of CQL row he wants 
returned.
         // We want to have getRangeSlice to count the number of columns, not 
the number of keys.
@@ -203,7 +208,7 @@ public class SelectStatement implements CQLStatement
                                      filter,
                                      getKeyBounds(variables),
                                      expressions,
-                                     getLimit(),
+                                     limit,
                                      true,
                                      false);
     }
@@ -252,7 +257,7 @@ public class SelectStatement implements CQLStatement
         return bounds;
     }
 
-    private IDiskAtomFilter makeFilter(List<ByteBuffer> variables)
+    private IDiskAtomFilter makeFilter(List<ByteBuffer> variables, int limit)
     throws InvalidRequestException
     {
         if (isColumnRange())
@@ -266,7 +271,7 @@ public class SelectStatement implements CQLStatement
                                                 getRequestedBound(Bound.END, 
variables));
             SliceQueryFilter filter = new SliceQueryFilter(new 
ColumnSlice[]{slice},
                                                            isReversed,
-                                                           getLimit(),
+                                                           limit,
                                                            toGroup);
             QueryProcessor.validateSliceFilter(cfDef.cfm, filter);
             return filter;
@@ -279,13 +284,35 @@ public class SelectStatement implements CQLStatement
         }
     }
 
-    private int getLimit()
+    private int getLimit(List<ByteBuffer> variables) throws 
InvalidRequestException
     {
+        int l = Integer.MAX_VALUE;
+        if (limit != null)
+        {
+            ByteBuffer b = limit.bindAndGet(variables);
+            if (b == null)
+                throw new InvalidRequestException("Invalid null value of 
limit");
+
+            try
+            {
+                Int32Type.instance.validate(b);
+                l = Int32Type.instance.compose(b);
+            }
+            catch (MarshalException e)
+            {
+                throw new InvalidRequestException("Invalid limit value");
+            }
+        }
+
+        if (l <= 0)
+            throw new InvalidRequestException("LIMIT must be strictly 
positive");
+
         // Internally, we don't support exclusive bounds for slices. Instead,
         // we query one more element if necessary and exclude
-        return sliceRestriction != null && 
!sliceRestriction.isInclusive(Bound.START) && parameters.limit != 
Integer.MAX_VALUE
-             ? parameters.limit + 1
-             : parameters.limit;
+        if (sliceRestriction != null && 
!sliceRestriction.isInclusive(Bound.START) && l != Integer.MAX_VALUE)
+            l += 1;
+
+        return l;
     }
 
     private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) 
throws InvalidRequestException
@@ -634,7 +661,7 @@ public class SelectStatement implements CQLStatement
         };
     }
 
-    private ResultSet process(List<Row> rows, List<ByteBuffer> variables) 
throws InvalidRequestException
+    private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int 
limit) throws InvalidRequestException
     {
         Selection.ResultSetBuilder result = selection.resultSetBuilder();
         for (org.apache.cassandra.db.Row row : rows)
@@ -740,7 +767,7 @@ public class SelectStatement implements CQLStatement
             cqlRows.reverse();
 
         // Trim result if needed to respect the limit
-        cqlRows.trim(parameters.limit);
+        cqlRows.trim(limit);
         return cqlRows;
     }
 
@@ -839,22 +866,21 @@ public class SelectStatement implements CQLStatement
         private final Parameters parameters;
         private final List<RawSelector> selectClause;
         private final List<Relation> whereClause;
+        private final Term.Raw limit;
 
-        public RawStatement(CFName cfName, Parameters parameters, 
List<RawSelector> selectClause, List<Relation> whereClause)
+        public RawStatement(CFName cfName, Parameters parameters, 
List<RawSelector> selectClause, List<Relation> whereClause, Term.Raw limit)
         {
             super(cfName);
             this.parameters = parameters;
             this.selectClause = selectClause;
             this.whereClause = whereClause == null ? 
Collections.<Relation>emptyList() : whereClause;
+            this.limit = limit;
         }
 
         public ParsedStatement.Prepared prepare() throws 
InvalidRequestException
         {
             CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), 
columnFamily());
 
-            if (parameters.limit <= 0)
-                throw new InvalidRequestException("LIMIT must be strictly 
positive");
-
             CFDefinition cfDef = cfm.getCfDef();
 
             ColumnSpecification[] names = new 
ColumnSpecification[getBoundsTerms()];
@@ -868,7 +894,14 @@ public class SelectStatement implements CQLStatement
                                 ? Selection.wildcard(cfDef)
                                 : Selection.fromSelectors(cfDef, selectClause);
 
-            SelectStatement stmt = new SelectStatement(cfDef, 
getBoundsTerms(), parameters, selection);
+            Term prepLimit = null;
+            if (limit != null)
+            {
+                prepLimit = limit.prepare(limitReceiver());
+                prepLimit.collectMarkerSpecification(names);
+            }
+
+            SelectStatement stmt = new SelectStatement(cfDef, 
getBoundsTerms(), parameters, selection, prepLimit);
 
             /*
              * WHERE clause. For a given entity, rules are:
@@ -1209,6 +1242,11 @@ public class SelectStatement implements CQLStatement
                                                });
         }
 
+        private ColumnSpecification limitReceiver()
+        {
+            return new ColumnSpecification(keyspace(), columnFamily(), new 
ColumnIdentifier("[limit]", true), Int32Type.instance);
+        }
+
         Restriction updateRestriction(CFDefinition.Name name, Restriction 
restriction, Relation newRel, ColumnSpecification[] boundNames) throws 
InvalidRequestException
         {
             ColumnSpecification receiver = name;
@@ -1265,12 +1303,11 @@ public class SelectStatement implements CQLStatement
         @Override
         public String toString()
         {
-            return String.format("SelectRawStatement[name=%s, selectClause=%s, 
whereClause=%s, isCount=%s, limit=%s]",
+            return String.format("SelectRawStatement[name=%s, selectClause=%s, 
whereClause=%s, isCount=%s]",
                     cfName,
                     selectClause,
                     whereClause,
-                    parameters.isCount,
-                    parameters.limit);
+                    parameters.isCount);
         }
     }
 
@@ -1409,15 +1446,13 @@ public class SelectStatement implements CQLStatement
 
     public static class Parameters
     {
-        private final int limit;
         private final Map<ColumnIdentifier, Boolean> orderings;
         private final boolean isCount;
         private final ColumnIdentifier countAlias;
         private final boolean allowFiltering;
 
-        public Parameters(int limit, Map<ColumnIdentifier, Boolean> orderings, 
boolean isCount, ColumnIdentifier countAlias, boolean allowFiltering)
+        public Parameters(Map<ColumnIdentifier, Boolean> orderings, boolean 
isCount, ColumnIdentifier countAlias, boolean allowFiltering)
         {
-            this.limit = limit;
             this.orderings = orderings;
             this.isCount = isCount;
             this.countAlias = countAlias;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index bc05dd6..cff4105 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -115,14 +115,14 @@ public class UpdateStatement extends ModificationStatement
          * @param columnValues list of column values (corresponds to names)
          * @param attrs additional attributes for statement (CL, timestamp, 
timeToLive)
          */
-        public ParsedInsert(CFName name, Attributes attrs, 
List<ColumnIdentifier> columnNames, List<Term.Raw> columnValues)
+        public ParsedInsert(CFName name, Attributes.Raw attrs, 
List<ColumnIdentifier> columnNames, List<Term.Raw> columnValues)
         {
             super(name, attrs, Collections.<Pair<ColumnIdentifier, 
Operation.RawUpdate>>emptyList(), false);
             this.columnNames = columnNames;
             this.columnValues = columnValues;
         }
 
-        protected ModificationStatement prepareInternal(CFDefinition cfDef, 
ColumnSpecification[] boundNames) throws InvalidRequestException
+        protected ModificationStatement prepareInternal(CFDefinition cfDef, 
ColumnSpecification[] boundNames, Attributes attrs) throws 
InvalidRequestException
         {
             UpdateStatement stmt = new UpdateStatement(getBoundsTerms(), 
cfDef.cfm, attrs);
 
@@ -182,7 +182,7 @@ public class UpdateStatement extends ModificationStatement
          * @param whereClause the where clause
          */
         public ParsedUpdate(CFName name,
-                            Attributes attrs,
+                            Attributes.Raw attrs,
                             List<Pair<ColumnIdentifier, Operation.RawUpdate>> 
updates,
                             List<Relation> whereClause,
                             List<Pair<ColumnIdentifier, Operation.RawUpdate>> 
conditions,
@@ -193,7 +193,7 @@ public class UpdateStatement extends ModificationStatement
             this.whereClause = whereClause;
         }
 
-        protected ModificationStatement prepareInternal(CFDefinition cfDef, 
ColumnSpecification[] boundNames) throws InvalidRequestException
+        protected ModificationStatement prepareInternal(CFDefinition cfDef, 
ColumnSpecification[] boundNames, Attributes attrs) throws 
InvalidRequestException
         {
             UpdateStatement stmt = new UpdateStatement(getBoundsTerms(), 
cfDef.cfm, attrs);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java 
b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 3bec918..ed8aaaf 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -221,7 +221,7 @@ public class BatchMessage extends Message.Request
 
             // Note: It's ok at this point to pass a bogus value for the 
number of bound terms in the BatchState ctor
             // (and no value would be really correct, so we prefer passing a 
clearly wrong one).
-            BatchStatement batch = new BatchStatement(-1, type, statements, 
new Attributes());
+            BatchStatement batch = new BatchStatement(-1, type, statements, 
Attributes.none());
             Message.Response response = QueryProcessor.processBatch(batch, 
consistency, state, values);
 
             if (tracingId != null)

Reply via email to