Author: eevans Date: Fri Dec 31 17:24:45 2010 New Revision: 1054140 URL: http://svn.apache.org/viewvc?rev=1054140&view=rev Log: port CQL server code, avro -> thrift
Patch by eevans; reviewed by jbellis for CASSANDRA-1913 Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=1054140&r1=1054139&r2=1054140&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Fri Dec 31 17:24:45 2010 @@ -1150,54 +1150,4 @@ public class CassandraServer implements } return null; } - - @Override - public CqlResult execute_cql_query(ByteBuffer query, Compression compression) - throws UnavailableException, InvalidRequestException, TimedOutException - { - String queryString = null; - - // Decompress the query string. - try - { - switch (compression) - { - case GZIP: - Inflater decompressor = new Inflater(); - decompressor.setInput(query.array(), 0, query.array().length); - - ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); - byte[] buffer = new byte[1024]; - - while (!decompressor.finished()) - { - int size = decompressor.inflate(buffer); - byteArray.write(buffer, 0, size); - } - - decompressor.end(); - - queryString = new String(byteArray.toByteArray(), 0, byteArray.size(), "UTF-8"); - } - } - catch (DataFormatException e) - { - throw newInvalidRequestException("Error deflating query string."); - } - catch (UnsupportedEncodingException e) - { - throw newInvalidRequestException("Unknown query string encoding."); - } - - try - { - return QueryProcessor.process(queryString, state()); - } - catch (RecognitionException e) - { - InvalidRequestException badQuery = newInvalidRequestException("Invalid or malformed CQL query string"); - badQuery.initCause(e); - throw badQuery; - } - } } Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g?rev=1054140&r1=1054139&r2=1054140&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/Cql.g Fri Dec 31 17:24:45 2010 @@ -10,7 +10,7 @@ options { import java.util.HashMap; import java.util.Collections; import org.apache.cassandra.thrift.ConsistencyLevel; - import org.apache.cassandra.avro.InvalidRequestException; + import org.apache.cassandra.thrift.InvalidRequestException; } @members { @@ -31,11 +31,7 @@ options { public void throwLastRecognitionError() throws InvalidRequestException { if (recognitionErrors.size() > 0) - { - InvalidRequestException invalidExcep = new InvalidRequestException(); - invalidExcep.why = recognitionErrors.get((recognitionErrors.size()-1)); - throw invalidExcep; - } + throw new InvalidRequestException(recognitionErrors.get((recognitionErrors.size()-1))); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1054140&r1=1054139&r2=1054140&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Fri Dec 31 17:24:45 2010 @@ -30,11 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.antlr.runtime.*; -import org.apache.cassandra.avro.Column; -import org.apache.cassandra.avro.*; -import org.apache.cassandra.avro.InvalidRequestException; -import org.apache.cassandra.avro.TimedOutException; -import org.apache.cassandra.avro.UnavailableException; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.dht.AbstractBounds; @@ -43,17 +38,11 @@ import org.apache.cassandra.dht.IPartiti import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.thrift.IndexClause; -import org.apache.cassandra.thrift.IndexExpression; -import org.apache.cassandra.thrift.IndexOperator; -import org.apache.cassandra.thrift.SlicePredicate; -import org.apache.cassandra.thrift.SliceRange; +import org.apache.cassandra.thrift.Column; +import org.apache.cassandra.thrift.*; -import static org.apache.cassandra.avro.AvroErrorFactory.newInvalidRequestException; -import static org.apache.cassandra.avro.AvroErrorFactory.newUnavailableException; -import static org.apache.cassandra.avro.AvroValidation.validateColumnFamily; -import static org.apache.cassandra.avro.AvroValidation.validateKey; +import static org.apache.cassandra.thrift.ThriftValidation.validateKey; +import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily; public class QueryProcessor { @@ -104,16 +93,6 @@ public class QueryProcessor { throw new RuntimeException(e); } - catch (org.apache.cassandra.thrift.UnavailableException e) - { - UnavailableException error = new UnavailableException(); - error.initCause(e); - throw error; - } - catch (org.apache.cassandra.thrift.InvalidRequestException e) - { - throw newInvalidRequestException(e); - } return rows; } @@ -183,14 +162,6 @@ public class QueryProcessor thriftSlicePredicate, select.getConsistencyLevel()); } - catch (org.apache.cassandra.thrift.UnavailableException ex) - { - UnavailableException avroEx = new UnavailableException(); - avroEx.why = ex.getMessage(); - if (avroEx.why == null || avroEx.why.length() == 0) - avroEx.why = "StorageProxy.scan() failed because of insufficent responses."; - throw avroEx; - } catch (IOException e) { throw new RuntimeException(e); @@ -267,23 +238,23 @@ public class QueryProcessor private static void validateSelect(String keyspace, SelectStatement select) throws InvalidRequestException { if (select.isCountOperation() && (select.isKeyRange() || select.getKeys().size() < 1)) - throw newInvalidRequestException("Counts can only be performed for a single record (Hint: KEY=term)"); + throw new InvalidRequestException("Counts can only be performed for a single record (Hint: KEY=term)"); // Finish key w/o start key (KEY < foo) if (!select.isKeyRange() && (select.getKeyFinish() != null)) - throw newInvalidRequestException("Key range clauses must include a start key (i.e. KEY > term)"); + throw new InvalidRequestException("Key range clauses must include a start key (i.e. KEY > term)"); // Key range and by-key(s) combined (KEY > foo AND KEY = bar) if (select.isKeyRange() && select.getKeys().size() > 0) - throw newInvalidRequestException("You cannot combine key range and by-key clauses in a SELECT"); + throw new InvalidRequestException("You cannot combine key range and by-key clauses in a SELECT"); // Start and finish keys, *and* column relations (KEY > foo AND KEY < bar and name1 = value1). if (select.isKeyRange() && (select.getKeyFinish() != null) && (select.getColumnRelations().size() > 0)) - throw newInvalidRequestException("You cannot combine key range and by-column clauses in a SELECT"); + throw new InvalidRequestException("You cannot combine key range and by-column clauses in a SELECT"); // Multiget scenario (KEY = foo AND KEY = bar ...) if (select.getKeys().size() > 1) - throw newInvalidRequestException("SELECTs can contain only by by-key clause"); + throw new InvalidRequestException("SELECTs can contain only by by-key clause"); if (select.getColumnRelations().size() > 0) { @@ -293,7 +264,7 @@ public class QueryProcessor if ((relation.operator().equals(RelationType.EQ)) && indexed.contains(relation.getEntity().getByteBuffer())) return; } - throw newInvalidRequestException("No indexed columns present in by-columns clause with \"equals\" operator"); + throw new InvalidRequestException("No indexed columns present in by-columns clause with \"equals\" operator"); } } @@ -328,9 +299,9 @@ public class QueryProcessor { avroResult.type = CqlResultType.INT; if (rows.size() > 0) - avroResult.num = rows.get(0).cf != null ? rows.get(0).cf.getSortedColumns().size() : 0; + avroResult.setNum(rows.get(0).cf != null ? rows.get(0).cf.getSortedColumns().size() : 0); else - avroResult.num = 0; + avroResult.setNum(0); return avroResult; } } @@ -394,7 +365,7 @@ public class QueryProcessor for (UpdateStatement up : batch.getUpdates()) if (up.isSetConsistencyLevel()) - throw newInvalidRequestException( + throw new InvalidRequestException( "Consistency level must be set on the BATCH, not individual UPDATE statements"); batchUpdate(keyspace, batch.getUpdates(), batch.getConsistencyLevel()); @@ -414,17 +385,13 @@ public class QueryProcessor { StorageProxy.truncateBlocking(keyspace, columnFamily); } - catch (org.apache.cassandra.thrift.UnavailableException e) - { - throw newUnavailableException(e); - } catch (TimeoutException e) { - throw newUnavailableException(e); + throw (UnavailableException) new UnavailableException().initCause(e); } catch (IOException e) { - throw newUnavailableException(e); + throw (UnavailableException) new UnavailableException().initCause(e); } avroResult.type = CqlResultType.VOID; @@ -452,10 +419,6 @@ public class QueryProcessor { StorageProxy.mutate(rowMutations, delete.getConsistencyLevel()); } - catch (org.apache.cassandra.thrift.UnavailableException e) - { - throw newUnavailableException(e); - } catch (TimeoutException e) { throw new TimedOutException(); Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1054140&r1=1054139&r2=1054140&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Fri Dec 31 17:24:45 2010 @@ -18,7 +18,10 @@ package org.apache.cassandra.thrift; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.*; import java.util.Map.Entry; @@ -26,14 +29,18 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; +import java.util.zip.DataFormatException; +import java.util.zip.Inflater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.antlr.runtime.RecognitionException; import org.apache.cassandra.auth.Permission; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.*; +import org.apache.cassandra.cql.QueryProcessor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.marshal.MarshalException; @@ -1127,5 +1134,64 @@ public class CassandraServer implements internal_remove(key, path, System.currentTimeMillis(), consistency_level); } + @Override + public CqlResult execute_cql_query(ByteBuffer query, Compression compression) + throws InvalidRequestException, UnavailableException, TimedOutException, TException + { + String queryString = null; + + // Decompress the query string. + try + { + switch (compression) + { + case GZIP: + ByteArrayOutputStream byteArray = new ByteArrayOutputStream(); + byte[] outBuffer = new byte[1024], inBuffer = new byte[1024]; + + Inflater decompressor = new Inflater(); + + int lenRead = 0; + while (true) + { + if (decompressor.needsInput()) + lenRead = query.remaining() < 1024 ? query.remaining() : 1024; + query.get(inBuffer, 0, lenRead); + decompressor.setInput(inBuffer, 0, lenRead); + + int lenWrite = 0; + while ((lenWrite = decompressor.inflate(outBuffer)) !=0) + byteArray.write(outBuffer, 0, lenWrite); + + if (decompressor.finished()) + break; + } + + decompressor.end(); + + queryString = new String(byteArray.toByteArray(), 0, byteArray.size(), "UTF-8"); + } + } + catch (DataFormatException e) + { + throw new InvalidRequestException("Error deflating query string."); + } + catch (UnsupportedEncodingException e) + { + throw new InvalidRequestException("Unknown query string encoding."); + } + + try + { + return QueryProcessor.process(queryString, state()); + } + catch (RecognitionException e) + { + InvalidRequestException ire = new InvalidRequestException("Invalid or malformed CQL query string"); + ire.initCause(e); + throw ire; + } + } + // main method moved to CassandraDaemon } Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=1054140&r1=1054139&r2=1054140&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java Fri Dec 31 17:24:45 2010 @@ -44,7 +44,7 @@ public class ThriftValidation { private static final Logger logger = LoggerFactory.getLogger(ThriftValidation.class); - static void validateKey(ByteBuffer key) throws InvalidRequestException + public static void validateKey(ByteBuffer key) throws InvalidRequestException { if (key == null || key.remaining() == 0) {