Author: eevans Date: Fri Nov 12 19:32:54 2010 New Revision: 1034536 URL: http://svn.apache.org/viewvc?rev=1034536&view=rev Log: add support for index scans
Patch by eevans Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java cassandra/trunk/test/system/test_cql.py Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1034536&r1=1034535&r2=1034536&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Fri Nov 12 19:32:54 2010 @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeoutException; import org.antlr.runtime.ANTLRStringStream; @@ -47,6 +48,7 @@ import org.apache.cassandra.db.ReadComma import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.db.SliceByNamesReadCommand; import org.apache.cassandra.db.SliceFromReadCommand; +import org.apache.cassandra.db.Table; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Bounds; @@ -54,6 +56,9 @@ import org.apache.cassandra.dht.IPartiti import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.thrift.IndexClause; +import org.apache.cassandra.thrift.IndexExpression; +import org.apache.cassandra.thrift.IndexOperator; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.SliceRange; import org.slf4j.Logger; @@ -137,25 +142,8 @@ public class QueryProcessor AbstractBounds bounds = new Bounds(p.getToken(select.getKeyStart().getByteBuffer()), p.getToken(select.getKeyFinish().getByteBuffer())); - // XXX: Our use of Thrift structs internally makes me Sad. :( - SlicePredicate thriftSlicePredicate = new SlicePredicate(); - if (select.isColumnRange() || select.getColumnNames().size() == 0) - { - SliceRange sliceRange = new SliceRange(); - sliceRange.start = select.getColumnStart().getByteBuffer(); - sliceRange.finish = select.getColumnFinish().getByteBuffer(); - sliceRange.reversed = false; // FIXME: hard-coded - sliceRange.count = select.getColumnsLimit(); - thriftSlicePredicate.slice_range = sliceRange; - } - else - { - List<ByteBuffer> columnNames = new ArrayList<ByteBuffer>(); - for (Term column : select.getColumnNames()) - columnNames.add(column.getByteBuffer()); - thriftSlicePredicate.column_names = columnNames; - } + SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select); try { @@ -182,6 +170,99 @@ public class QueryProcessor return rows; } + + private static List<org.apache.cassandra.db.Row> getIndexedSlices(String keyspace, SelectStatement select) + throws TimedOutException + { + // XXX: Our use of Thrift structs internally (still) makes me Sad. :~( + SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select); + + List<IndexExpression> expressions = new ArrayList<IndexExpression>(); + for (Relation columnRelation : select.getColumnRelations()) + { + expressions.add(new IndexExpression(columnRelation.getEntity().getByteBuffer(), + IndexOperator.valueOf(columnRelation.operator().toString()), + columnRelation.getValue().getByteBuffer())); + } + + ByteBuffer startKey = (!select.isKeyRange()) ? (new Term()).getByteBuffer() : select.getKeyStart().getByteBuffer(); + IndexClause thriftIndexClause = new IndexClause(expressions, startKey, select.getNumRecords()); + + List<org.apache.cassandra.db.Row> rows; + try + { + rows = StorageProxy.scan(keyspace, + select.getColumnFamily(), + thriftIndexClause, + thriftSlicePredicate, + select.getConsistencyLevel()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + catch (TimeoutException e) + { + throw new TimedOutException(); + } + + return rows; + } + + private static SlicePredicate slicePredicateFromSelect(SelectStatement select) + { + SlicePredicate thriftSlicePredicate = new SlicePredicate(); + + if (select.isColumnRange() || select.getColumnNames().size() == 0) + { + SliceRange sliceRange = new SliceRange(); + sliceRange.start = select.getColumnStart().getByteBuffer(); + sliceRange.finish = select.getColumnFinish().getByteBuffer(); + sliceRange.reversed = select.isColumnsReversed(); + sliceRange.count = select.getColumnsLimit(); + thriftSlicePredicate.slice_range = sliceRange; + } + else + { + List<ByteBuffer> columnNames = new ArrayList<ByteBuffer>(); + for (Term column : select.getColumnNames()) + columnNames.add(column.getByteBuffer()); + thriftSlicePredicate.column_names = columnNames; + } + + return thriftSlicePredicate; + } + + /* Test for SELECT-specific taboos */ + private static void validateSelect(String keyspace, SelectStatement select) throws InvalidRequestException + { + // Finish key w/o start key (KEY < foo) + if (!select.isKeyRange() && (select.getKeyFinish() != null)) + throw newInvalidRequestException("Key range clauses must include a start key (i.e. KEY > term)"); + + // Key range and by-key(s) combined (KEY > foo AND KEY = bar) + if (select.isKeyRange() && select.getKeys().size() > 0) + throw newInvalidRequestException("You cannot combine key range and by-key clauses in a SELECT"); + + // Start and finish keys, *and* column relations (KEY > foo AND KEY < bar and name1 = value1). + if (select.isKeyRange() && (select.getKeyFinish() != null) && (select.getColumnRelations().size() > 0)) + throw newInvalidRequestException("You cannot combine key range and by-column clauses in a SELECT"); + + // Multiget scenario (KEY = foo AND KEY = bar ...) + if (select.getKeys().size() > 1) + throw newInvalidRequestException("SELECTs can contain only by by-key clause"); + + if (select.getColumnRelations().size() > 0) + { + Set<ByteBuffer> indexed = Table.open(keyspace).getColumnFamilyStore(select.getColumnFamily()).getIndexedColumns(); + for (Relation relation : select.getColumnRelations()) + { + if ((relation.operator().equals(RelationType.EQ)) && indexed.contains(relation.getEntity().getByteBuffer())) + return; + } + throw newInvalidRequestException("No indexed columns present in by-columns clause with \"equals\" operator"); + } + } public static CqlResult process(String queryString, ClientState clientState) throws RecognitionException, UnavailableException, InvalidRequestException, TimedOutException @@ -199,27 +280,30 @@ public class QueryProcessor { case SELECT: SelectStatement select = (SelectStatement)statement.statement; + validateColumnFamily(keyspace, select.getColumnFamily()); + validateSelect(keyspace, select); List<CqlRow> avroRows = new ArrayList<CqlRow>(); avroResult.type = CqlResultType.ROWS; List<org.apache.cassandra.db.Row> rows = null; + // By-key if (!select.isKeyRange() && (select.getKeys().size() > 0)) { - // Multiple keys (aka "multiget") is not allowed( any longer). - if (select.getKeys().size() > 1) - throw newInvalidRequestException("SELECTs can contain only one by-key clause (i.e. KEY = TERM)"); - rows = getSlice(keyspace, select); } else { - // Combining key ranges and column index queries is not currently allowed - if (select.getColumnRelations().size() > 0) - throw newInvalidRequestException("You cannot combine key ranges and by-column clauses " + - "(i.e. \"name\" = \"value\") in a SELECT statement"); - - rows = multiRangeSlice(keyspace, select); + // Range query + if ((select.getKeyFinish() != null) || (select.getColumnRelations().size() == 0)) + { + rows = multiRangeSlice(keyspace, select); + } + // Index scan + else + { + rows = getIndexedSlices(keyspace, select); + } } // Create the result set Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java?rev=1034536&r1=1034535&r2=1034536&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/Relation.java Fri Nov 12 19:32:54 2010 @@ -27,8 +27,8 @@ package org.apache.cassandra.cql; */ public class Relation { - public EntityType entityType = EntityType.COLUMN; - public Term entity; + private EntityType entityType = EntityType.COLUMN; + private Term entity; private RelationType relationType; private Term value; @@ -64,10 +64,20 @@ public class Relation return relationType; } + public Term getEntity() + { + return entity; + } + public Term getValue() { return value; } + + public String toString() + { + return String.format("Relation(%s, %s,nnn %s)", entity, relationType, value); + } } enum EntityType Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java?rev=1034536&r1=1034535&r2=1034536&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/Term.java Fri Nov 12 19:32:54 2010 @@ -90,6 +90,11 @@ public class Term return type; } + public String toString() + { + return String.format("Term(%s, type=%s)", getText(), type); + } + } enum TermType Modified: cassandra/trunk/test/system/test_cql.py URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_cql.py?rev=1034536&r1=1034535&r2=1034536&view=diff ============================================================================== --- cassandra/trunk/test/system/test_cql.py (original) +++ cassandra/trunk/test/system/test_cql.py Fri Nov 12 19:32:54 2010 @@ -30,6 +30,16 @@ def load_sample(dbconn): ROW("af", COL(1L, "1"), COL(2L, "2"), COL(3L, "3"), COL(4L, "4")) AND ROW("ag", COL(5L, "5"), COL(6L, "6"), COL(7L, "8"), COL(9L, "9")); """) + dbconn.execute(""" + UPDATE + Indexed1 + WITH + ROW("asmith", COL("birthdate", 100L), COL("unindexed", 250L)) AND + ROW("dozer", COL("birthdate", 100L), COL("unindexed", 200L)) AND + ROW("morpheus", COL("birthdate", 175L), COL("unindexed", 200L)) AND + ROW("neo", COL("birthdate", 150L), COL("unindexed", 250L)) AND + ROW("trinity", COL("birthdate", 125L), COL("unindexed", 200L)); + """) def init(keyspace="Keyspace1"): dbconn = Connection(keyspace, 'localhost', 9170) @@ -104,3 +114,30 @@ class TestCql(AvroTester): query = 'SELECT "col" FROM Standard1 WHERE KEY = "ka" AND KEY = "kb";' assert_raises(CQLException, conn.execute, query) + def test_index_scan_equality(self): + "indexed scan where column equals value" + conn = init() + r = conn.execute('SELECT "birthdate" FROM Indexed1 WHERE "birthdate" = 100L') + assert len(r) == 2 + assert r[0]['key'] == "asmith" + assert r[1]['key'] == "dozer" + assert len(r[0]['columns']) == 1 + assert len(r[1]['columns']) == 1 + + def test_index_scan_greater_than(self): + "indexed scan where a column is greater than a value" + conn = init() + r = conn.execute(""" + SELECT "birthdate" FROM Indexed1 WHERE "birthdate" = 100L AND "unindexed" > 200L + """) + assert len(r) == 1 + assert r[0]['key'] == "asmith" + + def test_index_scan_with_start_key(self): + "indexed scan with a starting key" + conn = init() + r = conn.execute(""" + SELECT "birthdate" FROM Indexed1 WHERE "birthdate" = 100L AND KEY > "asmithZ" + """) + assert len(r) == 1 + assert r[0]['key'] == "dozer"