Author: eevans Date: Wed Dec 14 22:52:54 2011 New Revision: 1214520 URL: http://svn.apache.org/viewvc?rev=1214520&view=rev Log: CQL support for prepared statements
Patch by Rick Shaw; reviewed by eevans for CASSANDRA-2475 Modified: cassandra/trunk/interface/cassandra.thrift cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java cassandra/trunk/src/java/org/apache/cassandra/cql/CQLStatement.java cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g cassandra/trunk/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java cassandra/trunk/src/java/org/apache/cassandra/cql/SelectStatement.java cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java cassandra/trunk/src/java/org/apache/cassandra/cql/WhereClause.java cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Modified: cassandra/trunk/interface/cassandra.thrift URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/interface/cassandra.thrift (original) +++ cassandra/trunk/interface/cassandra.thrift Wed Dec 14 22:52:54 2011 @@ -46,7 +46,7 @@ namespace rb CassandraThrift # for every edit that doesn't result in a change to major/minor. # # See the Semantic Versioning Specification (SemVer) http://semver.org. -const string VERSION = "19.19.0" +const string VERSION = "19.22.0" # @@ -461,6 +461,12 @@ struct CqlResult { 4: optional CqlMetadata schema } +struct CqlPreparedResult { + 1: required i32 itemId, + 2: required i32 count +} + + service Cassandra { # auth methods void login(1: required AuthenticationRequest auth_request) throws (1:AuthenticationException authnx, 2:AuthorizationException authzx), @@ -683,4 +689,27 @@ service Cassandra { 2:UnavailableException ue, 3:TimedOutException te, 4:SchemaDisagreementException sde) + + + /** + * Prepare a CQL (Cassandra Query Language) statement by compiling and returning + * - the type of CQL statement + * - an id token of the compiled CQL stored on the server side. + * - a count of the discovered bound markers in the statement + */ + CqlPreparedResult prepare_cql_query(1:required binary query, 2:required Compression compression) + throws (1:InvalidRequestException ire) + + + /** + * Executes a prepared CQL (Cassandra Query Language) statement by passing an id token and a list of variables + * to bind and returns a CqlResult containing the results. + */ + CqlResult execute_prepared_cql_query(1:required i32 itemId, 2:required list<string> values) + throws (1:InvalidRequestException ire, + 2:UnavailableException ue, + 3:TimedOutException te, + 4:SchemaDisagreementException sde) + + } Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/AbstractModification.java Wed Dec 14 22:52:54 2011 @@ -103,7 +103,7 @@ public abstract class AbstractModificati * * @throws InvalidRequestException on the wrong request */ - public abstract List<IMutation> prepareRowMutations(String keyspace, ClientState clientState) + public abstract List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, List<String> variables) throws org.apache.cassandra.thrift.InvalidRequestException; /** @@ -117,6 +117,6 @@ public abstract class AbstractModificati * * @throws InvalidRequestException on the wrong request */ - public abstract List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) + public abstract List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp, List<String> variables) throws org.apache.cassandra.thrift.InvalidRequestException; } Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/BatchStatement.java Wed Dec 14 22:52:54 2011 @@ -76,12 +76,13 @@ public class BatchStatement return timeToLive; } - public List<IMutation> getMutations(String keyspace, ClientState clientState) throws InvalidRequestException + public List<IMutation> getMutations(String keyspace, ClientState clientState, List<String> variables) + throws InvalidRequestException { List<IMutation> batch = new LinkedList<IMutation>(); for (AbstractModification statement : statements) { - batch.addAll(statement.prepareRowMutations(keyspace, clientState, timestamp)); + batch.addAll(statement.prepareRowMutations(keyspace, clientState, timestamp, variables)); } return batch; Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/CQLStatement.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/CQLStatement.java?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/CQLStatement.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/CQLStatement.java Wed Dec 14 22:52:54 2011 @@ -24,6 +24,7 @@ public class CQLStatement { public StatementType type; public Object statement; + public int boundTerms = 0; public CQLStatement(StatementType type, Object statement) { Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g Wed Dec 14 22:52:54 2011 @@ -450,7 +450,7 @@ comparatorType ; term returns [Term item] - : ( t=K_KEY | t=STRING_LITERAL | t=INTEGER | t=UUID | t=IDENT | t=FLOAT) { $item = new Term($t.text, $t.type); } + : ( t=K_KEY | t=STRING_LITERAL | t=INTEGER | t=UUID | t=IDENT | t=FLOAT | t=QMARK) { $item = new Term($t.text, $t.type); } ; termList returns [List<Term> items] @@ -597,6 +597,11 @@ RANGEOP INTEGER : '-'? DIGIT+ ; + +QMARK + : '?' + ; + /* Normally a lexer only emits one token at a time, but ours is tricked out * to support multiple (see @lexer::members near the top of the grammar). Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java Wed Dec 14 22:52:54 2011 @@ -115,7 +115,7 @@ public class CreateColumnFamilyStatement } /** Perform validation of parsed params */ - private void validate() throws InvalidRequestException + private void validate(List<String> variables) throws InvalidRequestException { // Column family name if (!name.matches("\\w+")) @@ -174,7 +174,7 @@ public class CreateColumnFamilyStatement for (Map.Entry<Term, String> column : columns.entrySet()) { - ByteBuffer name = column.getKey().getByteBuffer(comparator); + ByteBuffer name = column.getKey().getByteBuffer(comparator, variables); if (keyAlias != null && keyAlias.equals(name)) throw new InvalidRequestException("Invalid column name: " @@ -271,9 +271,9 @@ public class CreateColumnFamilyStatement * @return a CFMetaData instance corresponding to the values parsed from this statement * @throws InvalidRequestException on failure to validate parsed parameters */ - public CFMetaData getCFMetaData(String keyspace) throws InvalidRequestException + public CFMetaData getCFMetaData(String keyspace, List<String> variables) throws InvalidRequestException { - validate(); + validate(variables); CFMetaData newCFMD; try @@ -367,4 +367,10 @@ public class CreateColumnFamilyStatement } return result; } + + public Map<Term, String> getColumns() + { + return columns; + } + } Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/DeleteStatement.java Wed Dec 14 22:52:54 2011 @@ -59,20 +59,17 @@ public class DeleteStatement extends Abs return columns; } - /** {@inheritDoc} */ public List<Term> getKeys() { return keys; } - /** {@inheritDoc} */ - public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException + public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, List<String> variables) throws InvalidRequestException { - return prepareRowMutations(keyspace, clientState, null); + return prepareRowMutations(keyspace, clientState, null, variables); } - /** {@inheritDoc} */ - public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException + public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp, List<String> variables) throws InvalidRequestException { clientState.hasColumnFamilyAccess(columnFamily, Permission.WRITE); AbstractType<?> keyType = Schema.instance.getCFMetaData(keyspace, columnFamily).getKeyValidator(); @@ -81,20 +78,21 @@ public class DeleteStatement extends Abs for (Term key : keys) { - rowMutations.add(mutationForKey(key.getByteBuffer(keyType), keyspace, timestamp, clientState)); + rowMutations.add(mutationForKey(key.getByteBuffer(keyType, variables), keyspace, timestamp, clientState,variables)); } return rowMutations; } - /** {@inheritDoc} */ - public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp, ClientState clientState) throws InvalidRequestException + public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp, ClientState clientState, List<String> variables) + throws InvalidRequestException { RowMutation rm = new RowMutation(keyspace, key); CFMetaData metadata = validateColumnFamily(keyspace, columnFamily); QueryProcessor.validateKeyAlias(metadata, keyName); + @SuppressWarnings("rawtypes") AbstractType comparator = metadata.getComparatorFor(null); if (columns.size() < 1) @@ -107,7 +105,7 @@ public class DeleteStatement extends Abs // Delete specific columns for (Term column : columns) { - ByteBuffer columnName = column.getByteBuffer(comparator); + ByteBuffer columnName = column.getByteBuffer(comparator, variables); validateColumnName(columnName); rm.delete(new QueryPath(columnFamily, null, columnName), (timestamp == null) ? getTimestamp(clientState) : timestamp); } Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Wed Dec 14 22:52:54 2011 @@ -73,7 +73,7 @@ public class QueryProcessor public static final String DEFAULT_KEY_NAME = bufferToString(CFMetaData.DEFAULT_KEY_NAME); - private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData metadata, SelectStatement select) + private static List<org.apache.cassandra.db.Row> getSlice(CFMetaData metadata, SelectStatement select, List<String> variables) throws InvalidRequestException, TimedOutException, UnavailableException { QueryPath queryPath = new QueryPath(select.getColumnFamily()); @@ -82,12 +82,12 @@ public class QueryProcessor // ...of a list of column names if (!select.isColumnRange()) { - Collection<ByteBuffer> columnNames = getColumnNames(select, metadata); + Collection<ByteBuffer> columnNames = getColumnNames(select, metadata, variables); validateColumnNames(columnNames); for (Term rawKey: select.getKeys()) { - ByteBuffer key = rawKey.getByteBuffer(metadata.getKeyValidator()); + ByteBuffer key = rawKey.getByteBuffer(metadata.getKeyValidator(),variables); validateKey(key); commands.add(new SliceByNamesReadCommand(metadata.ksName, key, queryPath, columnNames)); @@ -97,12 +97,12 @@ public class QueryProcessor else { AbstractType<?> comparator = select.getComparator(metadata.ksName); - ByteBuffer start = select.getColumnStart().getByteBuffer(comparator); - ByteBuffer finish = select.getColumnFinish().getByteBuffer(comparator); + ByteBuffer start = select.getColumnStart().getByteBuffer(comparator,variables); + ByteBuffer finish = select.getColumnFinish().getByteBuffer(comparator,variables); for (Term rawKey : select.getKeys()) { - ByteBuffer key = rawKey.getByteBuffer(metadata.getKeyValidator()); + ByteBuffer key = rawKey.getByteBuffer(metadata.getKeyValidator(),variables); validateKey(key); validateSliceRange(metadata, start, finish, select.isColumnsReversed()); @@ -130,7 +130,8 @@ public class QueryProcessor } } - private static List<ByteBuffer> getColumnNames(SelectStatement select, CFMetaData metadata) throws InvalidRequestException + private static List<ByteBuffer> getColumnNames(SelectStatement select, CFMetaData metadata, List<String> variables) + throws InvalidRequestException { String keyString = getKeyString(metadata); List<ByteBuffer> columnNames = new ArrayList<ByteBuffer>(); @@ -138,12 +139,12 @@ public class QueryProcessor { // skip the key for the slice op; we'll add it to the resultset in extractThriftColumns if (!column.getText().equalsIgnoreCase(keyString)) - columnNames.add(column.getByteBuffer(metadata.comparator)); + columnNames.add(column.getByteBuffer(metadata.comparator,variables)); } return columnNames; } - private static List<org.apache.cassandra.db.Row> multiRangeSlice(CFMetaData metadata, SelectStatement select) + private static List<org.apache.cassandra.db.Row> multiRangeSlice(CFMetaData metadata, SelectStatement select, List<String> variables) throws TimedOutException, UnavailableException, InvalidRequestException { List<org.apache.cassandra.db.Row> rows; @@ -152,11 +153,11 @@ public class QueryProcessor AbstractType<?> keyType = Schema.instance.getCFMetaData(metadata.ksName, select.getColumnFamily()).getKeyValidator(); ByteBuffer startKeyBytes = (select.getKeyStart() != null) - ? select.getKeyStart().getByteBuffer(keyType) + ? select.getKeyStart().getByteBuffer(keyType,variables) : null; ByteBuffer finishKeyBytes = (select.getKeyFinish() != null) - ? select.getKeyFinish().getByteBuffer(keyType) + ? select.getKeyFinish().getByteBuffer(keyType,variables) : null; RowPosition startKey = RowPosition.forKey(startKeyBytes, p), finishKey = RowPosition.forKey(finishKeyBytes, p); @@ -170,7 +171,7 @@ public class QueryProcessor AbstractBounds<RowPosition> bounds = new Bounds<RowPosition>(startKey, finishKey); // XXX: Our use of Thrift structs internally makes me Sad. :( - SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata); + SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata,variables); validateSlicePredicate(metadata, thriftSlicePredicate); int limit = select.isKeyRange() && select.getKeyStart() != null @@ -218,19 +219,19 @@ public class QueryProcessor return rows.subList(0, select.getNumRecords() < rows.size() ? select.getNumRecords() : rows.size()); } - private static List<org.apache.cassandra.db.Row> getIndexedSlices(CFMetaData metadata, SelectStatement select) + private static List<org.apache.cassandra.db.Row> getIndexedSlices(CFMetaData metadata, SelectStatement select, List<String> variables) throws TimedOutException, UnavailableException, InvalidRequestException { // XXX: Our use of Thrift structs internally (still) makes me Sad. :~( - SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata); + SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata, variables); validateSlicePredicate(metadata, thriftSlicePredicate); List<IndexExpression> expressions = new ArrayList<IndexExpression>(); for (Relation columnRelation : select.getColumnRelations()) { // Left and right side of relational expression encoded according to comparator/validator. - ByteBuffer entity = columnRelation.getEntity().getByteBuffer(metadata.comparator); - ByteBuffer value = columnRelation.getValue().getByteBuffer(select.getValueValidator(metadata.ksName, entity)); + ByteBuffer entity = columnRelation.getEntity().getByteBuffer(metadata.comparator, variables); + ByteBuffer value = columnRelation.getValue().getByteBuffer(select.getValueValidator(metadata.ksName, entity), variables); expressions.add(new IndexExpression(entity, IndexOperator.valueOf(columnRelation.operator().toString()), @@ -238,7 +239,7 @@ public class QueryProcessor } AbstractType<?> keyType = Schema.instance.getCFMetaData(metadata.ksName, select.getColumnFamily()).getKeyValidator(); - ByteBuffer startKey = (!select.isKeyRange()) ? (new Term()).getByteBuffer() : select.getKeyStart().getByteBuffer(keyType); + ByteBuffer startKey = (!select.isKeyRange()) ? (new Term()).getByteBuffer() : select.getKeyStart().getByteBuffer(keyType, variables); IndexClause thriftIndexClause = new IndexClause(expressions, startKey, select.getNumRecords()); List<org.apache.cassandra.db.Row> rows; @@ -262,7 +263,7 @@ public class QueryProcessor return rows; } - private static void batchUpdate(ClientState clientState, List<UpdateStatement> updateStatements, ConsistencyLevel consistency) + private static void batchUpdate(ClientState clientState, List<UpdateStatement> updateStatements, ConsistencyLevel consistency, List<String> variables ) throws InvalidRequestException, UnavailableException, TimedOutException { String globalKeyspace = clientState.getKeyspace(); @@ -280,7 +281,7 @@ public class QueryProcessor cfamsSeen.add(update.getColumnFamily()); } - rowMutations.addAll(update.prepareRowMutations(keyspace, clientState)); + rowMutations.addAll(update.prepareRowMutations(keyspace, clientState, variables)); } try @@ -297,7 +298,7 @@ public class QueryProcessor } } - private static SlicePredicate slicePredicateFromSelect(SelectStatement select, CFMetaData metadata) + private static SlicePredicate slicePredicateFromSelect(SelectStatement select, CFMetaData metadata, List<String> variables) throws InvalidRequestException { SlicePredicate thriftSlicePredicate = new SlicePredicate(); @@ -305,22 +306,22 @@ public class QueryProcessor if (select.isColumnRange() || select.getColumnNames().size() == 0) { SliceRange sliceRange = new SliceRange(); - sliceRange.start = select.getColumnStart().getByteBuffer(metadata.comparator); - sliceRange.finish = select.getColumnFinish().getByteBuffer(metadata.comparator); + sliceRange.start = select.getColumnStart().getByteBuffer(metadata.comparator, variables); + sliceRange.finish = select.getColumnFinish().getByteBuffer(metadata.comparator, variables); sliceRange.reversed = select.isColumnsReversed(); sliceRange.count = select.getColumnsLimit(); thriftSlicePredicate.slice_range = sliceRange; } else { - thriftSlicePredicate.column_names = getColumnNames(select, metadata); + thriftSlicePredicate.column_names = getColumnNames(select, metadata, variables); } return thriftSlicePredicate; } /* Test for SELECT-specific taboos */ - private static void validateSelect(String keyspace, SelectStatement select) throws InvalidRequestException + private static void validateSelect(String keyspace, SelectStatement select, List<String> variables) throws InvalidRequestException { ThriftValidation.validateConsistencyLevel(keyspace, select.getConsistencyLevel(), RequestType.READ); @@ -346,7 +347,7 @@ public class QueryProcessor Set<ByteBuffer> indexed = Table.open(keyspace).getColumnFamilyStore(select.getColumnFamily()).indexManager.getIndexedColumns(); for (Relation relation : select.getColumnRelations()) { - if ((relation.operator() == RelationType.EQ) && indexed.contains(relation.getEntity().getByteBuffer(comparator))) + if ((relation.operator() == RelationType.EQ) && indexed.contains(relation.getEntity().getByteBuffer(comparator, variables))) return; } throw new InvalidRequestException("No indexed columns present in by-columns clause with \"equals\" operator"); @@ -493,12 +494,125 @@ public class QueryProcessor Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE))); } - public static CqlResult process(String queryString, ClientState clientState) - throws RecognitionException, UnavailableException, InvalidRequestException, TimedOutException, SchemaDisagreementException + + private final static void maybeAddBoundTerm(CQLStatement statement, Term term) throws InvalidRequestException + { + if (term != null && term.isBindMarker()) + { + term.setBindIndex(statement.boundTerms++); + } + } + + public static void discoverBoundTerms(CQLStatement statement) throws InvalidRequestException + { + switch (statement.type) + { + case SELECT: + SelectStatement select = (SelectStatement)statement.statement; + if (logger.isTraceEnabled()) logger.trace(select.toString()); + + // handle the select expression first + if (!select.isColumnRange() ) + { + List<Term> list = select.getColumnNames(); + for (Term term : list) maybeAddBoundTerm(statement,term); + } + else + { + maybeAddBoundTerm(statement,select.getColumnStart()); + maybeAddBoundTerm(statement,select.getColumnFinish()); + } + + // next handle the WHERE clause NB order is VERY important + + // first check for a multi-key (IN) list + + if (select.isMultiKey()) + { + for (Term term : select.getKeys()) maybeAddBoundTerm(statement,term); + } + else if (!select.getColumnRelations().isEmpty()) + { + if (select.isKeyRange()) + { + maybeAddBoundTerm(statement,select.getKeyStart()); + maybeAddBoundTerm(statement,select.getKeyFinish()); + } + + for (Relation relation : select.getColumnRelations()) + { + maybeAddBoundTerm(statement,relation.getEntity()); + maybeAddBoundTerm(statement,relation.getValue()); + } + } + else + { + // maybe its empty or just a simple term + for (Term term : select.getKeys()) maybeAddBoundTerm(statement,term); + } + + break; + + case UPDATE: + UpdateStatement update = (UpdateStatement)statement.statement; + if (logger.isTraceEnabled()) logger.trace(update.toString()); + + // first handle the SET clause values that come in pairs for UPDATE. NB the order of the markers (?) + for (Map.Entry<Term, Operation> column : update.getColumns().entrySet()) + { + maybeAddBoundTerm(statement,column.getKey()); + maybeAddBoundTerm(statement,column.getValue().a); + } + + // now handle the key(s) in the WHERE clause + + for (Term term : update.getKeys()) maybeAddBoundTerm(statement,term); + break; + + case INSERT: // insert uses UpdateStatement but with different marker ordering + UpdateStatement insert = (UpdateStatement)statement.statement; + if (logger.isTraceEnabled()) logger.trace(insert.toString()); + + // first handle the INTO..VALUES clause values that are grouped in order for INSERT. NB the order of the markers (?) + for (Term term : insert.getColumnNames()) maybeAddBoundTerm(statement,term); + for (Term term : insert.getColumnValues()) maybeAddBoundTerm(statement,term); + + // now handle the key(s) in the VALUES clause + for (Term term : insert.getKeys()) maybeAddBoundTerm(statement,term); + break; + + case DELETE: + DeleteStatement delete = (DeleteStatement)statement.statement; + if (logger.isTraceEnabled()) logger.trace(delete.toString()); + + // first handle the columns list for DELETE. NB the order of the markers (?) + for (Term term : delete.getColumns()) maybeAddBoundTerm(statement,term); + + // now handle the key(s) in the WHERE clause + for (Term term : delete.getKeys()) maybeAddBoundTerm(statement,term); + break; + + case CREATE_COLUMNFAMILY: + CreateColumnFamilyStatement createCf = (CreateColumnFamilyStatement)statement.statement; + + // handle the left hand Terms. Not terribly useful but included for completeness + for (Term term : createCf.getColumns().keySet()) maybeAddBoundTerm(statement,term); + break; + + case CREATE_INDEX: + CreateIndexStatement createIdx = (CreateIndexStatement)statement.statement; + + // handle the column name Term. Not terribly useful but included for completeness + maybeAddBoundTerm(statement,createIdx.getColumnName()); + break; + + default: // all other statement types are a NOOP. + } + } + + public static CqlResult doTheStatement(CQLStatement statement,ClientState clientState, List<String> variables ) + throws UnavailableException, InvalidRequestException, TimedOutException, SchemaDisagreementException { - logger.trace("CQL QUERY: {}", queryString); - - CQLStatement statement = getStatement(queryString); String keyspace = null; // Some statements won't have (or don't need) a keyspace (think USE, or CREATE). @@ -507,7 +621,7 @@ public class QueryProcessor CqlResult result = new CqlResult(); - logger.debug("CQL statement type: {}", statement.type.toString()); + if (logger.isDebugEnabled()) logger.debug("CQL statement type: {}", statement.type.toString()); CFMetaData metadata; switch (statement.type) { @@ -535,26 +649,26 @@ public class QueryProcessor if (select.getKeys().size() > 0) validateKeyAlias(metadata, select.getKeyAlias()); - validateSelect(keyspace, select); + validateSelect(keyspace, select, variables); List<org.apache.cassandra.db.Row> rows; // By-key if (!select.isKeyRange() && (select.getKeys().size() > 0)) { - rows = getSlice(metadata, select); + rows = getSlice(metadata, select, variables); } else { // Range query if ((select.getKeyFinish() != null) || (select.getColumnRelations().size() == 0)) { - rows = multiRangeSlice(metadata, select); + rows = multiRangeSlice(metadata, select, variables); } // Index scan else { - rows = getIndexedSlices(metadata, select); + rows = getIndexedSlices(metadata, select, variables); } } @@ -632,7 +746,7 @@ public class QueryProcessor ByteBuffer name; try { - name = term.getByteBuffer(metadata.comparator); + name = term.getByteBuffer(metadata.comparator, variables); } catch (InvalidRequestException e) { @@ -666,7 +780,7 @@ public class QueryProcessor case UPDATE: UpdateStatement update = (UpdateStatement)statement.statement; ThriftValidation.validateConsistencyLevel(keyspace, update.getConsistencyLevel(), RequestType.WRITE); - batchUpdate(clientState, Collections.singletonList(update), update.getConsistencyLevel()); + batchUpdate(clientState, Collections.singletonList(update), update.getConsistencyLevel(),variables); result.type = CqlResultType.VOID; return result; @@ -690,7 +804,7 @@ public class QueryProcessor try { - StorageProxy.mutate(batch.getMutations(keyspace, clientState), batch.getConsistencyLevel()); + StorageProxy.mutate(batch.getMutations(keyspace, clientState, variables), batch.getConsistencyLevel()); } catch (org.apache.cassandra.thrift.UnavailableException e) { @@ -740,7 +854,7 @@ public class QueryProcessor try { - StorageProxy.mutate(delete.prepareRowMutations(keyspace, clientState), delete.getConsistencyLevel()); + StorageProxy.mutate(delete.prepareRowMutations(keyspace, clientState, variables), delete.getConsistencyLevel()); } catch (TimeoutException e) { @@ -786,7 +900,7 @@ public class QueryProcessor CreateColumnFamilyStatement createCf = (CreateColumnFamilyStatement)statement.statement; clientState.hasColumnFamilySchemaAccess(Permission.WRITE); validateSchemaAgreement(); - CFMetaData cfmd = createCf.getCFMetaData(keyspace); + CFMetaData cfmd = createCf.getCFMetaData(keyspace,variables); ThriftValidation.validateCfDef(cfmd.toThrift(), null); try @@ -828,7 +942,8 @@ public class QueryProcessor { if (cd.index_type != null) throw new InvalidRequestException("Index already exists"); - logger.debug("Updating column {} definition for index {}", oldCfm.comparator.getString(columnName), createIdx.getIndexName()); + if (logger.isDebugEnabled()) + logger.debug("Updating column {} definition for index {}", oldCfm.comparator.getString(columnName), createIdx.getIndexName()); cd.setIndex_type(IndexType.KEYS); cd.setIndex_name(createIdx.getIndexName()); columnExists = true; @@ -971,10 +1086,53 @@ public class QueryProcessor result.type = CqlResultType.VOID; return result; } - return null; // We should never get here. } + public static CqlResult process(String queryString, ClientState clientState) + throws RecognitionException, UnavailableException, InvalidRequestException, TimedOutException, SchemaDisagreementException + { + if (logger.isDebugEnabled()) logger.debug("CQL QUERY: {}", queryString); + + CQLStatement statement = getStatement(queryString); + + CqlResult result = doTheStatement(statement, clientState, new ArrayList<String>()); + + return result; + } + + public static CQLStatement prepare (String queryString, ClientState clientState) + throws RecognitionException, InvalidRequestException + { + if (logger.isDebugEnabled()) logger.debug("CQL QUERY: {}", queryString); + + CQLStatement statement = getStatement(queryString); + + return statement; + } + + public static CqlResult process_prepared(CQLStatement statement, ClientState clientState, List<String> variables) + throws UnavailableException, InvalidRequestException, TimedOutException, SchemaDisagreementException + { + // Check to see if there are any bound variables to verify + if (!(variables.isEmpty() && (statement.boundTerms==0))) + { + if (variables.size() != statement.boundTerms) + throw new InvalidRequestException(String.format("there were %d markers(?) in CQL but %d bound variables", + statement.boundTerms, variables.size())); + + // at this point there is a match in count between markers and variables that is non-zero + + if (logger.isTraceEnabled()) + for (int i = 0; i < variables.size(); i++) logger.trace("[{}] '{}'",i+1,variables.get(i)); + } + + CqlResult result = doTheStatement(statement, clientState, variables); + + return result; + } + + private static Column thriftify(IColumn c) { ByteBuffer value = (c instanceof CounterColumn) Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/SelectExpression.java Wed Dec 14 22:52:54 2011 @@ -147,4 +147,18 @@ public class SelectExpression { return wildcard; } + + public String toString() + { + return String.format("SelectExpression [numColumns=%s, reverseColumns=%s, hasFirstSet=%s, wildcard=%s, start=%s, finish=%s, columns=%s]", + numColumns, + reverseColumns, + hasFirstSet, + wildcard, + start, + finish, + columns); + } + + } Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/SelectStatement.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/SelectStatement.java?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/SelectStatement.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/SelectStatement.java Wed Dec 14 22:52:54 2011 @@ -185,4 +185,17 @@ public class SelectStatement return Schema.instance.getValueValidator(keyspace, columnFamily, column); } + public String toString() + { + return String.format("SelectStatement [expression=%s, isCountOper=%s, columnFamily=%s, keyspace=%s, cLevel=%s, clause=%s, numRecords=%s]", + expression, + isCountOper, + columnFamily, + keyspace, + cLevel, + clause, + numRecords); + } + + } Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java Wed Dec 14 22:52:54 2011 @@ -21,6 +21,7 @@ package org.apache.cassandra.cql; import java.nio.ByteBuffer; +import java.util.List; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AsciiType; @@ -36,6 +37,8 @@ public class Term private final String text; private final TermType type; + private Integer bindIndex; + /** * Create new Term instance from a string, and an integer that corresponds * with the token ID from CQLParser. @@ -66,6 +69,11 @@ public class Term this.text = ""; this.type = TermType.STRING; } + + public void setBindIndex(int bindIndex) + { + this.bindIndex = bindIndex; + } /** * Returns the text parsed to create this term. @@ -76,7 +84,7 @@ public class Term { return text; } - + /** * Returns the typed value, serialized to a ByteBuffer according to a * comparator/validator. @@ -84,11 +92,18 @@ public class Term * @return a ByteBuffer of the value. * @throws InvalidRequestException if unable to coerce the string to its type. */ - public ByteBuffer getByteBuffer(AbstractType<?> validator) throws InvalidRequestException + public ByteBuffer getByteBuffer(AbstractType<?> validator, List<String> variables) throws InvalidRequestException { try { - return validator.fromString(text); + if (!isBindMarker()) return validator.fromString(text); + + // must be a marker term so check for a CqlBindValue stored in the term + if (bindIndex==null) throw new AssertionError("a marker Term was encountered with no index value"); + + String bindValue = variables.get(bindIndex); + + return validator.fromString(bindValue); } catch (MarshalException e) { @@ -136,6 +151,11 @@ public class Term { return String.format("Term(%s, type=%s)", getText(), type); } + + public boolean isBindMarker() + { + return type==TermType.QMARK; + } @Override public int hashCode() @@ -157,6 +177,7 @@ public class Term if (getClass() != obj.getClass()) return false; Term other = (Term) obj; + if (type==TermType.QMARK) return false; // markers are never equal if (text == null) { if (other.text != null) @@ -173,7 +194,7 @@ public class Term enum TermType { - STRING, INTEGER, UUID, FLOAT; + STRING, INTEGER, UUID, FLOAT, QMARK; static TermType forInt(int type) { @@ -184,7 +205,9 @@ enum TermType else if (type == CqlParser.UUID) return UUID; else if (type == CqlParser.FLOAT) - return FLOAT; + return FLOAT; + else if (type == CqlParser.QMARK) + return QMARK; // FIXME: handled scenario that should never occur. return null; Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java Wed Dec 14 22:52:54 2011 @@ -124,13 +124,13 @@ public class UpdateStatement extends Abs } /** {@inheritDoc} */ - public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException + public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, List<String> variables) throws InvalidRequestException { - return prepareRowMutations(keyspace, clientState, null); + return prepareRowMutations(keyspace, clientState, null, variables); } /** {@inheritDoc} */ - public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException + public List<IMutation> prepareRowMutations(String keyspace, ClientState clientState, Long timestamp, List<String> variables) throws InvalidRequestException { List<String> cfamsSeen = new ArrayList<String>(); @@ -162,7 +162,7 @@ public class UpdateStatement extends Abs for (Term key: keys) { - rowMutations.add(mutationForKey(keyspace, key.getByteBuffer(getKeyType(keyspace)), metadata, timestamp, clientState)); + rowMutations.add(mutationForKey(keyspace, key.getByteBuffer(getKeyType(keyspace),variables), metadata, timestamp, clientState, variables)); } return rowMutations; @@ -182,7 +182,8 @@ public class UpdateStatement extends Abs * * @throws InvalidRequestException on the wrong request */ - private IMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp, ClientState clientState) throws InvalidRequestException + private IMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp, ClientState clientState, List<String> variables) + throws InvalidRequestException { AbstractType<?> comparator = getComparator(keyspace); @@ -192,7 +193,7 @@ public class UpdateStatement extends Abs for (Map.Entry<Term, Operation> column : getColumns().entrySet()) { - ByteBuffer colName = column.getKey().getByteBuffer(comparator); + ByteBuffer colName = column.getKey().getByteBuffer(comparator, variables); Operation op = column.getValue(); if (op.isUnary()) @@ -200,7 +201,7 @@ public class UpdateStatement extends Abs if (hasCounterColumn) throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed."); - ByteBuffer colValue = op.a.getByteBuffer(getValueValidator(keyspace, colName)); + ByteBuffer colValue = op.a.getByteBuffer(getValueValidator(keyspace, colName),variables); validateColumn(metadata, colName, colValue); rm.add(new QueryPath(columnFamily, null, colName), @@ -239,7 +240,6 @@ public class UpdateStatement extends Abs return columnFamily; } - /** {@inheritDoc} */ public List<Term> getKeys() { return keys; @@ -293,4 +293,15 @@ public class UpdateStatement extends Abs { return Schema.instance.getValueValidator(keyspace, columnFamily, column); } + + public List<Term> getColumnNames() + { + return columnNames; + } + + public List<Term> getColumnValues() + { + return columnValues; + } + } Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/WhereClause.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/WhereClause.java?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/WhereClause.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/WhereClause.java Wed Dec 14 22:52:54 2011 @@ -182,4 +182,19 @@ public class WhereClause } } } + + public String toString() + { + return String.format("WhereClause [keys=%s, startKey=%s, finishKey=%s, columns=%s, includeStartKey=%s, includeFinishKey=%s, multiKey=%s, keyAlias=%s]", + keys, + startKey, + finishKey, + columns, + includeStartKey, + includeFinishKey, + multiKey, + keyAlias); + } + + } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/ClientState.java Wed Dec 14 22:52:54 2011 @@ -19,6 +19,7 @@ package org.apache.cassandra.service; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -30,6 +31,7 @@ import org.apache.cassandra.auth.Authent import org.apache.cassandra.auth.Permission; import org.apache.cassandra.auth.Resources; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql.CQLStatement; import org.apache.cassandra.db.Table; import org.apache.cassandra.thrift.AuthenticationException; import org.apache.cassandra.thrift.InvalidRequestException; @@ -48,6 +50,9 @@ public class ClientState // Reusable array for authorization private final List<Object> resource = new ArrayList<Object>(); + // a map of prepared statements index by an integer + private Map<Integer,CQLStatement> prepared = new HashMap<Integer,CQLStatement>(); + private long clock; /** @@ -58,6 +63,11 @@ public class ClientState reset(); } + public Map<Integer, CQLStatement> getPrepared() + { + return prepared; + } + public String getRawKeyspace() { return keyspace; @@ -114,6 +124,7 @@ public class ClientState user = DatabaseDescriptor.getAuthenticator().defaultUser(); keyspace = null; resourceClear(); + prepared.clear(); } /** Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1214520&r1=1214519&r2=1214520&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Wed Dec 14 22:52:54 2011 @@ -41,6 +41,7 @@ import org.apache.cassandra.auth.Permiss import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.*; +import org.apache.cassandra.cql.CQLStatement; import org.apache.cassandra.cql.QueryProcessor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.QueryPath; @@ -985,7 +986,8 @@ public class CassandraServer implements } /** update an existing keyspace, but do not allow column family modifications. - * @throws SchemaDisagreementException */ + * @throws SchemaDisagreementException + */ public synchronized String system_update_keyspace(KsDef ks_def) throws InvalidRequestException, SchemaDisagreementException, TException { @@ -1149,9 +1151,8 @@ public class CassandraServer implements internal_remove(key, path, System.currentTimeMillis(), consistency_level, true); } - - public CqlResult execute_cql_query(ByteBuffer query, Compression compression) - throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException + + private static String uncompress(ByteBuffer query, Compression compression) throws InvalidRequestException { String queryString = null; @@ -1161,7 +1162,7 @@ public class CassandraServer implements switch (compression) { case GZIP: - FastByteArrayOutputStream byteArray = new FastByteArrayOutputStream(); + FastByteArrayOutputStream byteArray = new FastByteArrayOutputStream(); byte[] outBuffer = new byte[1024], inBuffer = new byte[1024]; Inflater decompressor = new Inflater(); @@ -1206,7 +1207,16 @@ public class CassandraServer implements { throw new InvalidRequestException("Unknown query string encoding."); } + return queryString; + } + + public CqlResult execute_cql_query(ByteBuffer query, Compression compression) + throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException + { + if (logger.isDebugEnabled()) logger.debug("execute_cql_query"); + String queryString = uncompress(query,compression); + try { return QueryProcessor.process(queryString, state()); @@ -1218,6 +1228,54 @@ public class CassandraServer implements throw ire; } } + + private static final int makeItemId(String cql) + { + // use the hash of the string till something better is provided + return cql.hashCode(); + } + + public CqlPreparedResult prepare_cql_query(ByteBuffer query, Compression compression) + throws InvalidRequestException, TException + { + if (logger.isDebugEnabled()) logger.debug("prepare_cql_query"); + + String queryString = uncompress(query,compression); + int itemId = makeItemId(queryString); + + try + { + CQLStatement statement = QueryProcessor.prepare(queryString, state()); + + // discover all the marked Terms and hang them off of statement for use later + QueryProcessor.discoverBoundTerms(statement); + if (logger.isTraceEnabled()) logger.trace("Discovered "+ statement.boundTerms + " bound variables."); + + // put the prepared Statement into the Map + state().getPrepared().put(itemId, statement); + if (logger.isTraceEnabled()) logger.trace("Storing prepared statement: #"+ itemId + " count:"+state().getPrepared().size()); + return new CqlPreparedResult(itemId, statement.boundTerms); + } + catch (RecognitionException e) + { + InvalidRequestException ire = new InvalidRequestException("Invalid or malformed CQL query string"); + ire.initCause(e); + throw ire; + } + } + + + public CqlResult execute_prepared_cql_query(int itemId,List<String> bindVariables) + throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException + { + if (logger.isDebugEnabled()) logger.debug("execute_prepared_cql_query"); + + CQLStatement statement = state().getPrepared().get(itemId); + if (logger.isTraceEnabled()) logger.trace("Retreving prepared statement: #"+ itemId + " count:"+state().getPrepared().size()); + + CqlResult result = QueryProcessor.process_prepared(statement, state(), bindVariables); + return result; + } // main method moved to CassandraDaemon }