Updated Branches: refs/heads/trunk bc3597d35 -> 524261f88
Allow preparing timestamp, ttl and limit in queries patch by slebresne; reviewed by iamaleksey for CASSANDRA-4450 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/524261f8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/524261f8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/524261f8 Branch: refs/heads/trunk Commit: 524261f88cd2adcd623de3604e735b282dd5caac Parents: bc3597d Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Mon Apr 29 09:27:36 2013 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue May 28 11:09:16 2013 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/cql3/Attributes.java | 110 ++++++++++++++- src/java/org/apache/cassandra/cql3/Cql.g | 35 +++-- .../cassandra/cql3/statements/BatchStatement.java | 22 ++-- .../cassandra/cql3/statements/DeleteStatement.java | 4 +- .../cql3/statements/ModificationStatement.java | 28 ++-- .../cassandra/cql3/statements/SelectStatement.java | 103 +++++++++----- .../cassandra/cql3/statements/UpdateStatement.java | 8 +- .../cassandra/transport/messages/BatchMessage.java | 2 +- 9 files changed, 225 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e233ba0..e630d23 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -53,6 +53,7 @@ * Track max/min column names in sstables to be able to optimize slice queries (CASSANDRA-5514) * Binary protocol: allow batching already prepared statements (CASSANDRA-4693) + * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450) 1.2.6 * (Hadoop) Fix InputKeyRange in CFIF (CASSANDRA-5536) http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/Attributes.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java b/src/java/org/apache/cassandra/cql3/Attributes.java index 62f98b2..511f34e 100644 --- a/src/java/org/apache/cassandra/cql3/Attributes.java +++ b/src/java/org/apache/cassandra/cql3/Attributes.java @@ -17,7 +17,13 @@ */ package org.apache.cassandra.cql3; +import java.nio.ByteBuffer; +import java.util.List; + import org.apache.cassandra.db.ExpiringColumn; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.marshal.MarshalException; import org.apache.cassandra.exceptions.InvalidRequestException; /** @@ -26,15 +32,107 @@ import org.apache.cassandra.exceptions.InvalidRequestException; */ public class Attributes { - public Long timestamp; - public int timeToLive; + private final Term timestamp; + private final Term timeToLive; + + public static Attributes none() + { + return new Attributes(null, null); + } + + private Attributes(Term timestamp, Term timeToLive) + { + this.timestamp = timestamp; + this.timeToLive = timeToLive; + } + + public boolean isTimestampSet() + { + return timestamp != null; + } + + public boolean isTimeToLiveSet() + { + return timeToLive != null; + } + + public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException + { + if (timestamp == null) + return now; + + ByteBuffer tval = timestamp.bindAndGet(variables); + if (tval == null) + throw new InvalidRequestException("Invalid null value of timestamp"); - public void validate() throws InvalidRequestException + try + { + LongType.instance.validate(tval); + } + catch (MarshalException e) + { + throw new InvalidRequestException("Invalid timestamp value"); + } + + return LongType.instance.compose(tval); + } + + public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException { - if (timeToLive < 0) + if (timeToLive == null) + return 0; + + ByteBuffer tval = timeToLive.bindAndGet(variables); + if (tval == null) + throw new InvalidRequestException("Invalid null value of TTL"); + + try + { + Int32Type.instance.validate(tval); + } + catch (MarshalException e) + { + throw new InvalidRequestException("Invalid timestamp value"); + } + + int ttl = Int32Type.instance.compose(tval); + if (ttl < 0) throw new InvalidRequestException("A TTL must be greater or equal to 0"); - if (timeToLive > ExpiringColumn.MAX_TTL) - throw new InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", timeToLive, ExpiringColumn.MAX_TTL)); + if (ttl > ExpiringColumn.MAX_TTL) + throw new InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", ttl, ExpiringColumn.MAX_TTL)); + + return ttl; + } + + public void collectMarkerSpecification(ColumnSpecification[] boundNames) + { + if (timestamp != null) + timestamp.collectMarkerSpecification(boundNames); + if (timeToLive != null) + timeToLive.collectMarkerSpecification(boundNames); + } + + public static class Raw + { + public Term.Raw timestamp; + public Term.Raw timeToLive; + + public Attributes prepare(String ksName, String cfName) throws InvalidRequestException + { + Term ts = timestamp == null ? null : timestamp.prepare(timestampReceiver(ksName, cfName)); + Term ttl = timeToLive == null ? null : timeToLive.prepare(timeToLiveReceiver(ksName, cfName)); + return new Attributes(ts, ttl); + } + + private ColumnSpecification timestampReceiver(String ksName, String cfName) + { + return new ColumnSpecification(ksName, cfName, new ColumnIdentifier("[timestamp]", true), LongType.instance); + } + + private ColumnSpecification timeToLiveReceiver(String ksName, String cfName) + { + return new ColumnSpecification(ksName, cfName, new ColumnIdentifier("[ttl]", true), Int32Type.instance); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/Cql.g ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g index 8d31de9..913f6ea 100644 --- a/src/java/org/apache/cassandra/cql3/Cql.g +++ b/src/java/org/apache/cassandra/cql3/Cql.g @@ -212,7 +212,7 @@ selectStatement returns [SelectStatement.RawStatement expr] @init { boolean isCount = false; ColumnIdentifier countAlias = null; - int limit = Integer.MAX_VALUE; + Term.Raw limit = null; Map<ColumnIdentifier, Boolean> orderings = new LinkedHashMap<ColumnIdentifier, Boolean>(); boolean allowFiltering = false; } @@ -221,15 +221,14 @@ selectStatement returns [SelectStatement.RawStatement expr] K_FROM cf=columnFamilyName ( K_WHERE wclause=whereClause )? ( K_ORDER K_BY orderByClause[orderings] ( ',' orderByClause[orderings] )* )? - ( K_LIMIT rows=INTEGER { limit = Integer.parseInt($rows.text); } )? + ( K_LIMIT rows=intValue { limit = rows; } )? ( K_ALLOW K_FILTERING { allowFiltering = true; } )? { - SelectStatement.Parameters params = new SelectStatement.Parameters(limit, - orderings, + SelectStatement.Parameters params = new SelectStatement.Parameters(orderings, isCount, countAlias, allowFiltering); - $expr = new SelectStatement.RawStatement(cf, params, sclause, wclause); + $expr = new SelectStatement.RawStatement(cf, params, sclause, wclause, limit); } ; @@ -283,7 +282,7 @@ orderByClause[Map<ColumnIdentifier, Boolean> orderings] */ insertStatement returns [UpdateStatement.ParsedInsert expr] @init { - Attributes attrs = new Attributes(); + Attributes.Raw attrs = new Attributes.Raw(); List<ColumnIdentifier> columnNames = new ArrayList<ColumnIdentifier>(); List<Term.Raw> values = new ArrayList<Term.Raw>(); } @@ -297,21 +296,21 @@ insertStatement returns [UpdateStatement.ParsedInsert expr] } ; -usingClause[Attributes attrs] +usingClause[Attributes.Raw attrs] : K_USING usingClauseObjective[attrs] ( K_AND? usingClauseObjective[attrs] )* ; -usingClauseDelete[Attributes attrs] +usingClauseDelete[Attributes.Raw attrs] : K_USING usingClauseDeleteObjective[attrs] ( K_AND? usingClauseDeleteObjective[attrs] )* ; -usingClauseDeleteObjective[Attributes attrs] - : K_TIMESTAMP ts=INTEGER { attrs.timestamp = Long.valueOf($ts.text); } +usingClauseDeleteObjective[Attributes.Raw attrs] + : K_TIMESTAMP ts=intValue { attrs.timestamp = ts; } ; -usingClauseObjective[Attributes attrs] +usingClauseObjective[Attributes.Raw attrs] : usingClauseDeleteObjective[attrs] - | K_TTL t=INTEGER { attrs.timeToLive = Integer.valueOf($t.text); } + | K_TTL t=intValue { attrs.timeToLive = t; } ; /** @@ -322,7 +321,7 @@ usingClauseObjective[Attributes attrs] */ updateStatement returns [UpdateStatement.ParsedUpdate expr] @init { - Attributes attrs = new Attributes(); + Attributes.Raw attrs = new Attributes.Raw(); List<Pair<ColumnIdentifier, Operation.RawUpdate>> operations = new ArrayList<Pair<ColumnIdentifier, Operation.RawUpdate>>(); boolean ifNotExists = false; } @@ -354,7 +353,7 @@ updateCondition returns [List<Pair<ColumnIdentifier, Operation.RawUpdate>> condi */ deleteStatement returns [DeleteStatement.Parsed expr] @init { - Attributes attrs = new Attributes(); + Attributes.Raw attrs = new Attributes.Raw(); List<Operation.RawDeletion> columnDeletions = Collections.emptyList(); } : K_DELETE ( dels=deleteSelection { columnDeletions = dels; } )? @@ -410,7 +409,7 @@ batchStatement returns [BatchStatement.Parsed expr] @init { BatchStatement.Type type = BatchStatement.Type.LOGGED; List<ModificationStatement.Parsed> statements = new ArrayList<ModificationStatement.Parsed>(); - Attributes attrs = new Attributes(); + Attributes.Raw attrs = new Attributes.Raw(); } : K_BEGIN ( K_UNLOGGED { type = BatchStatement.Type.UNLOGGED; } | K_COUNTER { type = BatchStatement.Type.COUNTER; } )? @@ -738,6 +737,12 @@ value returns [Term.Raw value] | QMARK { $value = new AbstractMarker.Raw(++currentBindMarkerIdx); } ; +intValue returns [Term.Raw value] + : + | t=INTEGER { $value = Constants.Literal.integer($t.text); } + | QMARK { $value = new AbstractMarker.Raw(++currentBindMarkerIdx); } + ; + functionName returns [String s] : f=IDENT { $s = $f.text; } | u=unreserved_function_keyword { $s = u; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index d6d0e16..777c80f 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -76,23 +76,16 @@ public class BatchStatement implements CQLStatement public void validate(ClientState state) throws InvalidRequestException { - if (attrs.timeToLive != 0) + if (attrs.isTimeToLiveSet()) throw new InvalidRequestException("Global TTL on the BATCH statement is not supported."); for (ModificationStatement statement : statements) { - statement.validate(state); - - if (attrs.timestamp != null && statement.isSetTimestamp()) + if (attrs.isTimestampSet() && statement.isTimestampSet()) throw new InvalidRequestException("Timestamp must be set either on BATCH or individual statements"); } } - public long getTimestamp(long now) - { - return attrs.timestamp == null ? now : attrs.timestamp; - } - private Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now) throws RequestExecutionException, RequestValidationException { @@ -125,7 +118,7 @@ public class BatchStatement implements CQLStatement throws RequestExecutionException, RequestValidationException { // Group mutation together, otherwise they won't get applied atomically - for (IMutation m : statement.getMutations(variables, local, cl, getTimestamp(now), true)) + for (IMutation m : statement.getMutations(variables, local, cl, attrs.getTimestamp(now, variables), true)) { Pair<String, ByteBuffer> key = Pair.create(m.getTable(), m.key()); IMutation existing = mutations.get(key); @@ -179,10 +172,10 @@ public class BatchStatement implements CQLStatement public static class Parsed extends CFStatement { private final Type type; - private final Attributes attrs; + private final Attributes.Raw attrs; private final List<ModificationStatement.Parsed> parsedStatements; - public Parsed(Type type, Attributes attrs, List<ModificationStatement.Parsed> parsedStatements) + public Parsed(Type type, Attributes.Raw attrs, List<ModificationStatement.Parsed> parsedStatements) { super(null); this.type = type; @@ -217,7 +210,10 @@ public class BatchStatement implements CQLStatement statements.add(stmt); } - return new ParsedStatement.Prepared(new BatchStatement(getBoundsTerms(), type, statements, attrs), Arrays.<ColumnSpecification>asList(boundNames)); + Attributes prepAttrs = attrs.prepare("[batch]", "[batch]"); + prepAttrs.collectMarkerSpecification(boundNames); + + return new ParsedStatement.Prepared(new BatchStatement(getBoundsTerms(), type, statements, prepAttrs), Arrays.<ColumnSpecification>asList(boundNames)); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java index db3c41c..54a1034 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java @@ -93,7 +93,7 @@ public class DeleteStatement extends ModificationStatement private final List<Relation> whereClause; public Parsed(CFName name, - Attributes attrs, + Attributes.Raw attrs, List<Operation.RawDeletion> deletions, List<Relation> whereClause, List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions) @@ -103,7 +103,7 @@ public class DeleteStatement extends ModificationStatement this.whereClause = whereClause; } - protected ModificationStatement prepareInternal(CFDefinition cfDef, ColumnSpecification[] boundNames) throws InvalidRequestException + protected ModificationStatement prepareInternal(CFDefinition cfDef, ColumnSpecification[] boundNames, Attributes attrs) throws InvalidRequestException { DeleteStatement stmt = new DeleteStatement(getBoundsTerms(), cfDef.cfm, attrs); http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 7766f94..f6b7140 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -85,19 +85,19 @@ public abstract class ModificationStatement implements CQLStatement return cfm.getDefaultValidator().isCommutative(); } - public int getTimeToLive() + public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException { - return attrs.timeToLive; + return attrs.getTimestamp(now, variables); } - public long getTimestamp(long now) + public boolean isTimestampSet() { - return attrs.timestamp == null ? now : attrs.timestamp; + return attrs.isTimestampSet(); } - public boolean isSetTimestamp() + public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException { - return attrs.timestamp != null; + return attrs.getTimeToLive(variables); } public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException @@ -107,7 +107,6 @@ public abstract class ModificationStatement implements CQLStatement public void validate(ClientState state) throws InvalidRequestException { - attrs.validate(); } public void addOperation(Operation op) @@ -363,7 +362,7 @@ public abstract class ModificationStatement implements CQLStatement throw new InvalidRequestException("IN on the partition key is not supported with conditional updates"); ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables); - UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(queryState.getTimestamp()), getTimeToLive(), null); + UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(queryState.getTimestamp(), variables), getTimeToLive(variables), null); ByteBuffer key = keys.get(0); ThriftValidation.validateKey(cfm, key); @@ -407,7 +406,7 @@ public abstract class ModificationStatement implements CQLStatement // Some lists operation requires reading Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, clusteringPrefix, local, cl); - UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now), getTimeToLive(), rows); + UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows); Collection<IMutation> mutations = new ArrayList<IMutation>(); for (ByteBuffer key: keys) @@ -449,11 +448,11 @@ public abstract class ModificationStatement implements CQLStatement public static abstract class Parsed extends CFStatement { - protected final Attributes attrs; + protected final Attributes.Raw attrs; private final List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions; private final boolean ifNotExists; - protected Parsed(CFName name, Attributes attrs, List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions, boolean ifNotExists) + protected Parsed(CFName name, Attributes.Raw attrs, List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions, boolean ifNotExists) { super(name); this.attrs = attrs; @@ -473,7 +472,10 @@ public abstract class ModificationStatement implements CQLStatement CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily()); CFDefinition cfDef = metadata.getCfDef(); - ModificationStatement stmt = prepareInternal(cfDef, boundNames); + Attributes preparedAttributes = attrs.prepare(keyspace(), columnFamily()); + preparedAttributes.collectMarkerSpecification(boundNames); + + ModificationStatement stmt = prepareInternal(cfDef, boundNames, preparedAttributes); if (ifNotExists || (conditions != null && !conditions.isEmpty())) { @@ -528,6 +530,6 @@ public abstract class ModificationStatement implements CQLStatement return stmt; } - protected abstract ModificationStatement prepareInternal(CFDefinition cfDef, ColumnSpecification[] boundNames) throws InvalidRequestException; + protected abstract ModificationStatement prepareInternal(CFDefinition cfDef, ColumnSpecification[] boundNames, Attributes attrs) throws InvalidRequestException; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index d45b730..9630771 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -58,6 +58,7 @@ public class SelectStatement implements CQLStatement public final CFDefinition cfDef; public final Parameters parameters; private final Selection selection; + private final Term limit; private final Restriction[] keyRestrictions; private final Restriction[] columnRestrictions; @@ -92,7 +93,7 @@ public class SelectStatement implements CQLStatement } } - public SelectStatement(CFDefinition cfDef, int boundTerms, Parameters parameters, Selection selection) + public SelectStatement(CFDefinition cfDef, int boundTerms, Parameters parameters, Selection selection, Term limit) { this.cfDef = cfDef; this.boundTerms = boundTerms; @@ -100,6 +101,7 @@ public class SelectStatement implements CQLStatement this.keyRestrictions = new Restriction[cfDef.keys.size()]; this.columnRestrictions = new Restriction[cfDef.columns.size()]; this.parameters = parameters; + this.limit = limit; } public int getBoundsTerms() @@ -124,17 +126,18 @@ public class SelectStatement implements CQLStatement cl.validateForRead(keyspace()); + int limit = getLimit(variables); List<Row> rows = isKeyRange || usesSecondaryIndexing - ? StorageProxy.getRangeSlice(getRangeCommand(variables), cl) - : StorageProxy.read(getSliceCommands(variables), cl); + ? StorageProxy.getRangeSlice(getRangeCommand(variables, limit), cl) + : StorageProxy.read(getSliceCommands(variables, limit), cl); - return processResults(rows, variables); + return processResults(rows, variables, limit); } - private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables) throws RequestValidationException + private ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> variables, int limit) throws RequestValidationException { // Even for count, we need to process the result as it'll group some column together in sparse column families - ResultSet rset = process(rows, variables); + ResultSet rset = process(rows, variables, limit); rset = parameters.isCount ? rset.makeCountResult(parameters.countAlias) : rset; return new ResultMessage.Rows(rset); } @@ -150,17 +153,19 @@ public class SelectStatement implements CQLStatement public ResultMessage.Rows executeInternal(QueryState state) throws RequestExecutionException, RequestValidationException { + List<ByteBuffer> variables = Collections.<ByteBuffer>emptyList(); + int limit = getLimit(variables); List<Row> rows = isKeyRange || usesSecondaryIndexing - ? RangeSliceVerbHandler.executeLocally(getRangeCommand(Collections.<ByteBuffer>emptyList())) - : readLocally(keyspace(), getSliceCommands(Collections.<ByteBuffer>emptyList())); + ? RangeSliceVerbHandler.executeLocally(getRangeCommand(variables, limit)) + : readLocally(keyspace(), getSliceCommands(variables, limit)); - return processResults(rows, Collections.<ByteBuffer>emptyList()); + return processResults(rows, variables, limit); } public ResultSet process(List<Row> rows) throws InvalidRequestException { assert !parameters.isCount; // not yet needed - return process(rows, Collections.<ByteBuffer>emptyList()); + return process(rows, Collections.<ByteBuffer>emptyList(), getLimit(Collections.<ByteBuffer>emptyList())); } public String keyspace() @@ -173,12 +178,12 @@ public class SelectStatement implements CQLStatement return cfDef.cfm.cfName; } - private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables) throws RequestValidationException + private List<ReadCommand> getSliceCommands(List<ByteBuffer> variables, int limit) throws RequestValidationException { Collection<ByteBuffer> keys = getKeys(variables); List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size()); - IDiskAtomFilter filter = makeFilter(variables); + IDiskAtomFilter filter = makeFilter(variables, limit); // Note that we use the total limit for every key, which is potentially inefficient. // However, IN + LIMIT is not a very sensible choice. for (ByteBuffer key : keys) @@ -192,9 +197,9 @@ public class SelectStatement implements CQLStatement return commands; } - private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables) throws RequestValidationException + private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int limit) throws RequestValidationException { - IDiskAtomFilter filter = makeFilter(variables); + IDiskAtomFilter filter = makeFilter(variables, limit); List<IndexExpression> expressions = getIndexExpressions(variables); // The LIMIT provided by the user is the number of CQL row he wants returned. // We want to have getRangeSlice to count the number of columns, not the number of keys. @@ -203,7 +208,7 @@ public class SelectStatement implements CQLStatement filter, getKeyBounds(variables), expressions, - getLimit(), + limit, true, false); } @@ -252,7 +257,7 @@ public class SelectStatement implements CQLStatement return bounds; } - private IDiskAtomFilter makeFilter(List<ByteBuffer> variables) + private IDiskAtomFilter makeFilter(List<ByteBuffer> variables, int limit) throws InvalidRequestException { if (isColumnRange()) @@ -266,7 +271,7 @@ public class SelectStatement implements CQLStatement getRequestedBound(Bound.END, variables)); SliceQueryFilter filter = new SliceQueryFilter(new ColumnSlice[]{slice}, isReversed, - getLimit(), + limit, toGroup); QueryProcessor.validateSliceFilter(cfDef.cfm, filter); return filter; @@ -279,13 +284,35 @@ public class SelectStatement implements CQLStatement } } - private int getLimit() + private int getLimit(List<ByteBuffer> variables) throws InvalidRequestException { + int l = Integer.MAX_VALUE; + if (limit != null) + { + ByteBuffer b = limit.bindAndGet(variables); + if (b == null) + throw new InvalidRequestException("Invalid null value of limit"); + + try + { + Int32Type.instance.validate(b); + l = Int32Type.instance.compose(b); + } + catch (MarshalException e) + { + throw new InvalidRequestException("Invalid limit value"); + } + } + + if (l <= 0) + throw new InvalidRequestException("LIMIT must be strictly positive"); + // Internally, we don't support exclusive bounds for slices. Instead, // we query one more element if necessary and exclude - return sliceRestriction != null && !sliceRestriction.isInclusive(Bound.START) && parameters.limit != Integer.MAX_VALUE - ? parameters.limit + 1 - : parameters.limit; + if (sliceRestriction != null && !sliceRestriction.isInclusive(Bound.START) && l != Integer.MAX_VALUE) + l += 1; + + return l; } private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException @@ -634,7 +661,7 @@ public class SelectStatement implements CQLStatement }; } - private ResultSet process(List<Row> rows, List<ByteBuffer> variables) throws InvalidRequestException + private ResultSet process(List<Row> rows, List<ByteBuffer> variables, int limit) throws InvalidRequestException { Selection.ResultSetBuilder result = selection.resultSetBuilder(); for (org.apache.cassandra.db.Row row : rows) @@ -740,7 +767,7 @@ public class SelectStatement implements CQLStatement cqlRows.reverse(); // Trim result if needed to respect the limit - cqlRows.trim(parameters.limit); + cqlRows.trim(limit); return cqlRows; } @@ -839,22 +866,21 @@ public class SelectStatement implements CQLStatement private final Parameters parameters; private final List<RawSelector> selectClause; private final List<Relation> whereClause; + private final Term.Raw limit; - public RawStatement(CFName cfName, Parameters parameters, List<RawSelector> selectClause, List<Relation> whereClause) + public RawStatement(CFName cfName, Parameters parameters, List<RawSelector> selectClause, List<Relation> whereClause, Term.Raw limit) { super(cfName); this.parameters = parameters; this.selectClause = selectClause; this.whereClause = whereClause == null ? Collections.<Relation>emptyList() : whereClause; + this.limit = limit; } public ParsedStatement.Prepared prepare() throws InvalidRequestException { CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily()); - if (parameters.limit <= 0) - throw new InvalidRequestException("LIMIT must be strictly positive"); - CFDefinition cfDef = cfm.getCfDef(); ColumnSpecification[] names = new ColumnSpecification[getBoundsTerms()]; @@ -868,7 +894,14 @@ public class SelectStatement implements CQLStatement ? Selection.wildcard(cfDef) : Selection.fromSelectors(cfDef, selectClause); - SelectStatement stmt = new SelectStatement(cfDef, getBoundsTerms(), parameters, selection); + Term prepLimit = null; + if (limit != null) + { + prepLimit = limit.prepare(limitReceiver()); + prepLimit.collectMarkerSpecification(names); + } + + SelectStatement stmt = new SelectStatement(cfDef, getBoundsTerms(), parameters, selection, prepLimit); /* * WHERE clause. For a given entity, rules are: @@ -1209,6 +1242,11 @@ public class SelectStatement implements CQLStatement }); } + private ColumnSpecification limitReceiver() + { + return new ColumnSpecification(keyspace(), columnFamily(), new ColumnIdentifier("[limit]", true), Int32Type.instance); + } + Restriction updateRestriction(CFDefinition.Name name, Restriction restriction, Relation newRel, ColumnSpecification[] boundNames) throws InvalidRequestException { ColumnSpecification receiver = name; @@ -1265,12 +1303,11 @@ public class SelectStatement implements CQLStatement @Override public String toString() { - return String.format("SelectRawStatement[name=%s, selectClause=%s, whereClause=%s, isCount=%s, limit=%s]", + return String.format("SelectRawStatement[name=%s, selectClause=%s, whereClause=%s, isCount=%s]", cfName, selectClause, whereClause, - parameters.isCount, - parameters.limit); + parameters.isCount); } } @@ -1409,15 +1446,13 @@ public class SelectStatement implements CQLStatement public static class Parameters { - private final int limit; private final Map<ColumnIdentifier, Boolean> orderings; private final boolean isCount; private final ColumnIdentifier countAlias; private final boolean allowFiltering; - public Parameters(int limit, Map<ColumnIdentifier, Boolean> orderings, boolean isCount, ColumnIdentifier countAlias, boolean allowFiltering) + public Parameters(Map<ColumnIdentifier, Boolean> orderings, boolean isCount, ColumnIdentifier countAlias, boolean allowFiltering) { - this.limit = limit; this.orderings = orderings; this.isCount = isCount; this.countAlias = countAlias; http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index bc05dd6..cff4105 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -115,14 +115,14 @@ public class UpdateStatement extends ModificationStatement * @param columnValues list of column values (corresponds to names) * @param attrs additional attributes for statement (CL, timestamp, timeToLive) */ - public ParsedInsert(CFName name, Attributes attrs, List<ColumnIdentifier> columnNames, List<Term.Raw> columnValues) + public ParsedInsert(CFName name, Attributes.Raw attrs, List<ColumnIdentifier> columnNames, List<Term.Raw> columnValues) { super(name, attrs, Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList(), false); this.columnNames = columnNames; this.columnValues = columnValues; } - protected ModificationStatement prepareInternal(CFDefinition cfDef, ColumnSpecification[] boundNames) throws InvalidRequestException + protected ModificationStatement prepareInternal(CFDefinition cfDef, ColumnSpecification[] boundNames, Attributes attrs) throws InvalidRequestException { UpdateStatement stmt = new UpdateStatement(getBoundsTerms(), cfDef.cfm, attrs); @@ -182,7 +182,7 @@ public class UpdateStatement extends ModificationStatement * @param whereClause the where clause */ public ParsedUpdate(CFName name, - Attributes attrs, + Attributes.Raw attrs, List<Pair<ColumnIdentifier, Operation.RawUpdate>> updates, List<Relation> whereClause, List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions, @@ -193,7 +193,7 @@ public class UpdateStatement extends ModificationStatement this.whereClause = whereClause; } - protected ModificationStatement prepareInternal(CFDefinition cfDef, ColumnSpecification[] boundNames) throws InvalidRequestException + protected ModificationStatement prepareInternal(CFDefinition cfDef, ColumnSpecification[] boundNames, Attributes attrs) throws InvalidRequestException { UpdateStatement stmt = new UpdateStatement(getBoundsTerms(), cfDef.cfm, attrs); http://git-wip-us.apache.org/repos/asf/cassandra/blob/524261f8/src/java/org/apache/cassandra/transport/messages/BatchMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java index 3bec918..ed8aaaf 100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -221,7 +221,7 @@ public class BatchMessage extends Message.Request // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor // (and no value would be really correct, so we prefer passing a clearly wrong one). - BatchStatement batch = new BatchStatement(-1, type, statements, new Attributes()); + BatchStatement batch = new BatchStatement(-1, type, statements, Attributes.none()); Message.Response response = QueryProcessor.processBatch(batch, consistency, state, values); if (tracingId != null)