Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 d7cb97005 -> 485feefb2
Adds QueryHandler interface patch by beobal & slebresne; reviewed by beobal & slebresne for CASSANDRA-6659 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/485feefb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/485feefb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/485feefb Branch: refs/heads/cassandra-2.0 Commit: 485feefb2503fa58f92a10ea5f310bf5a29029f2 Parents: d7cb970 Author: beobal <s...@beobal.com> Authored: Thu Mar 6 17:59:49 2014 +0000 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Mar 26 15:57:47 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/cql3/BatchQueryOptions.java | 55 +++++++++ .../org/apache/cassandra/cql3/QueryHandler.java | 35 ++++++ .../apache/cassandra/cql3/QueryProcessor.java | 119 ++++--------------- .../cql3/hooks/BatchExecutionContext.java | 52 -------- .../cassandra/cql3/hooks/ExecutionContext.java | 47 -------- .../cassandra/cql3/hooks/PostExecutionHook.java | 52 -------- .../cql3/hooks/PostPreparationHook.java | 38 ------ .../cassandra/cql3/hooks/PreExecutionHook.java | 62 ---------- .../cql3/hooks/PreparationContext.java | 41 ------- .../cql3/statements/ListUsersStatement.java | 2 +- .../apache/cassandra/service/ClientState.java | 30 +++++ .../cassandra/thrift/CassandraServer.java | 12 +- .../transport/messages/BatchMessage.java | 9 +- .../transport/messages/ExecuteMessage.java | 7 +- .../transport/messages/PrepareMessage.java | 2 +- .../transport/messages/QueryMessage.java | 4 +- 17 files changed, 163 insertions(+), 405 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e971df1..65f4f3b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -32,6 +32,7 @@ * Improve MeteredFlusher handling of MF-unaffected column families (CASSANDRA-6867) * Add CqlRecordReader using native pagination (CASSANDRA-6311) + * Add QueryHandler interface (CASSANDRA-6659) Merged from 1.2: * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816) * add extra SSL cipher suites (CASSANDRA-6613) http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java new file mode 100644 index 0000000..cbf5e92 --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.cassandra.db.ConsistencyLevel; + +/** + * Options for a batch (at the protocol level) queries. + */ +public class BatchQueryOptions +{ + private final ConsistencyLevel consistency; + private final List<List<ByteBuffer>> values; + private final List<Object> queryOrIdList; + + public BatchQueryOptions(ConsistencyLevel cl, List<List<ByteBuffer>> values, List<Object> queryOrIdList) + { + this.consistency = cl; + this.values = values; + this.queryOrIdList = queryOrIdList; + } + + public ConsistencyLevel getConsistency() + { + return consistency; + } + + public List<List<ByteBuffer>> getValues() + { + return values; + } + + public List<Object> getQueryOrIdList() + { + return queryOrIdList; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/cql3/QueryHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java new file mode 100644 index 0000000..4d72333 --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import org.apache.cassandra.cql3.statements.BatchStatement; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.MD5Digest; + +public interface QueryHandler +{ + public ResultMessage process(String query, QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException; + public ResultMessage.Prepared prepare(String query, QueryState state) throws RequestValidationException; + public CQLStatement getPrepared(MD5Digest id); + public CQLStatement getPreparedForThrift(Integer id); + public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException; + public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 167533f..fe818fd 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -19,7 +19,6 @@ package org.apache.cassandra.cql3; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.CopyOnWriteArrayList; import com.google.common.primitives.Ints; @@ -30,7 +29,6 @@ import org.github.jamm.MemoryMeter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cql3.hooks.*; import org.apache.cassandra.cql3.statements.*; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.db.*; @@ -42,10 +40,12 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MD5Digest; import org.apache.cassandra.utils.SemanticVersion; -public class QueryProcessor +public class QueryProcessor implements QueryHandler { public static final SemanticVersion CQL_VERSION = new SemanticVersion("3.1.5"); + public static final QueryProcessor instance = new QueryProcessor(); + private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class); private static final MemoryMeter meter = new MemoryMeter(); private static final long MAX_CACHE_PREPARED_MEMORY = Runtime.getRuntime().maxMemory() / 256; @@ -98,46 +98,16 @@ public class QueryProcessor } } - private static final List<PreExecutionHook> preExecutionHooks = new CopyOnWriteArrayList<>(); - private static final List<PostExecutionHook> postExecutionHooks = new CopyOnWriteArrayList<>(); - private static final List<PostPreparationHook> postPreparationHooks = new CopyOnWriteArrayList<>(); - - public static void addPreExecutionHook(PreExecutionHook hook) - { - preExecutionHooks.add(hook); - } - - public static void removePreExecutionHook(PreExecutionHook hook) - { - preExecutionHooks.remove(hook); - } - - public static void addPostExecutionHook(PostExecutionHook hook) - { - postExecutionHooks.add(hook); - } - - public static void removePostExecutionHook(PostExecutionHook hook) + private QueryProcessor() { - postExecutionHooks.remove(hook); } - public static void addPostPreparationHook(PostPreparationHook hook) - { - postPreparationHooks.add(hook); - } - - public static void removePostPreparationHook(PostPreparationHook hook) - { - postPreparationHooks.remove(hook); - } - - public static CQLStatement getPrepared(MD5Digest id) + public CQLStatement getPrepared(MD5Digest id) { return preparedStatements.get(id); } - public static CQLStatement getPrepared(Integer id) + public CQLStatement getPreparedForThrift(Integer id) { return thriftPreparedStatements.get(id); } @@ -174,10 +144,9 @@ public class QueryProcessor throw new InvalidRequestException("Invalid empty value for clustering column of COMPACT TABLE"); } - private static ResultMessage processStatement(CQLStatement statement, + public static ResultMessage processStatement(CQLStatement statement, QueryState queryState, - QueryOptions options, - String queryString) + QueryOptions options) throws RequestExecutionException, RequestValidationException { logger.trace("Process {} @CL.{}", statement, options.getConsistency()); @@ -185,41 +154,24 @@ public class QueryProcessor statement.checkAccess(clientState); statement.validate(clientState); - ResultMessage result = preExecutionHooks.isEmpty() && postExecutionHooks.isEmpty() - ? statement.execute(queryState, options) - : executeWithHooks(statement, new ExecutionContext(queryState, queryString, options)); - + ResultMessage result = statement.execute(queryState, options); return result == null ? new ResultMessage.Void() : result; } - private static ResultMessage executeWithHooks(CQLStatement statement, ExecutionContext context) - throws RequestExecutionException, RequestValidationException - { - for (PreExecutionHook hook : preExecutionHooks) - statement = hook.processStatement(statement, context); - - ResultMessage result = statement.execute(context.queryState, context.queryOptions); - - for (PostExecutionHook hook : postExecutionHooks) - hook.processStatement(statement, context); - - return result; - } - public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState) throws RequestExecutionException, RequestValidationException { - return process(queryString, queryState, new QueryOptions(cl, Collections.<ByteBuffer>emptyList())); + return instance.process(queryString, queryState, new QueryOptions(cl, Collections.<ByteBuffer>emptyList())); } - public static ResultMessage process(String queryString, QueryState queryState, QueryOptions options) + public ResultMessage process(String queryString, QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException { CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement; if (prepared.getBoundTerms() != options.getValues().size()) throw new InvalidRequestException("Invalid amount of bind variables"); - return processStatement(prepared, queryState, options, queryString); + return processStatement(prepared, queryState, options); } public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws RequestValidationException @@ -231,7 +183,7 @@ public class QueryProcessor { try { - ResultMessage result = process(query, QueryState.forInternalCalls(), new QueryOptions(cl, Collections.<ByteBuffer>emptyList())); + ResultMessage result = instance.process(query, QueryState.forInternalCalls(), new QueryOptions(cl, Collections.<ByteBuffer>emptyList())); if (result instanceof ResultMessage.Rows) return new UntypedResultSet(((ResultMessage.Rows)result).result); else @@ -282,6 +234,12 @@ public class QueryProcessor } } + public ResultMessage.Prepared prepare(String queryString, QueryState queryState) + throws RequestValidationException + { + return prepare(queryString, queryState.getClientState(), false); + } + public static ResultMessage.Prepared prepare(String queryString, ClientState clientState, boolean forThrift) throws RequestValidationException { @@ -291,16 +249,7 @@ public class QueryProcessor throw new InvalidRequestException(String.format("Too many markers(?). %d markers exceed the allowed maximum of %d", boundTerms, FBUtilities.MAX_UNSIGNED_SHORT)); assert boundTerms == prepared.boundNames.size(); - ResultMessage.Prepared msg = storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift); - - if (!postPreparationHooks.isEmpty()) - { - PreparationContext context = new PreparationContext(clientState, queryString, prepared.boundNames); - for (PostPreparationHook hook : postPreparationHooks) - hook.processStatement(prepared.statement, context); - } - - return msg; + return storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift); } private static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, ParsedStatement.Prepared prepared, boolean forThrift) @@ -336,7 +285,7 @@ public class QueryProcessor } } - public static ResultMessage processPrepared(CQLStatement statement, QueryState queryState, QueryOptions options) + public ResultMessage processPrepared(CQLStatement statement, QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException { List<ByteBuffer> variables = options.getValues(); @@ -355,40 +304,20 @@ public class QueryProcessor logger.trace("[{}] '{}'", i+1, variables.get(i)); } - return processStatement(statement, queryState, options, null); + return processStatement(statement, queryState, options); } - public static ResultMessage processBatch(BatchStatement batch, - ConsistencyLevel cl, - QueryState queryState, - List<List<ByteBuffer>> variables, - List<Object> queryOrIdList) + public ResultMessage processBatch(BatchStatement batch, QueryState queryState, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException { ClientState clientState = queryState.getClientState(); batch.checkAccess(clientState); batch.validate(clientState); - if (preExecutionHooks.isEmpty() && postExecutionHooks.isEmpty()) - batch.executeWithPerStatementVariables(cl, queryState, variables); - else - executeBatchWithHooks(batch, cl, new BatchExecutionContext(queryState, queryOrIdList, variables)); - + batch.executeWithPerStatementVariables(options.getConsistency(), queryState, options.getValues()); return new ResultMessage.Void(); } - private static void executeBatchWithHooks(BatchStatement batch, ConsistencyLevel cl, BatchExecutionContext context) - throws RequestExecutionException, RequestValidationException - { - for (PreExecutionHook hook : preExecutionHooks) - batch = hook.processBatch(batch, context); - - batch.executeWithPerStatementVariables(cl, context.queryState, context.variables); - - for (PostExecutionHook hook : postExecutionHooks) - hook.processBatch(batch, context); - } - public static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState) throws RequestValidationException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/cql3/hooks/BatchExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/hooks/BatchExecutionContext.java b/src/java/org/apache/cassandra/cql3/hooks/BatchExecutionContext.java deleted file mode 100644 index 8c81bea..0000000 --- a/src/java/org/apache/cassandra/cql3/hooks/BatchExecutionContext.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.cql3.hooks; - -import java.nio.ByteBuffer; -import java.util.List; - -import org.apache.cassandra.service.QueryState; - -/** - * Contextual information about the execution of a CQL Batch. - * Used by {@link org.apache.cassandra.cql3.hooks.PreExecutionHook} and - * {@link org.apache.cassandra.cql3.hooks.PostExecutionHook} - * - * The {@code queryOrIdList} field, provides a list of objects which - * may be used to identify the individual statements in the batch. - * Currently, these objects will be one of two types (and the list may - * contain a mixture of the two). A {@code String} indicates the statement is - * a regular (i.e. non-prepared) statement, and is in fact the CQL - * string for the statement. An {@code MD5Digest} object indicates a prepared - * statement & may be used to retrieve the corresponding CQLStatement - * using {@link org.apache.cassandra.cql3.QueryProcessor#getPrepared(org.apache.cassandra.utils.MD5Digest) QueryProcessor.getPrepared()} - * - */ -public class BatchExecutionContext -{ - public final QueryState queryState; - public final List<Object> queryOrIdList; - public final List<List<ByteBuffer>> variables; - - public BatchExecutionContext(QueryState queryState, List<Object> queryOrIdList, List<List<ByteBuffer>> variables) - { - this.queryState = queryState; - this.queryOrIdList = queryOrIdList; - this.variables = variables; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/cql3/hooks/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/hooks/ExecutionContext.java b/src/java/org/apache/cassandra/cql3/hooks/ExecutionContext.java deleted file mode 100644 index 56d56c8..0000000 --- a/src/java/org/apache/cassandra/cql3/hooks/ExecutionContext.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.cql3.hooks; - -import com.google.common.base.Optional; - -import org.apache.cassandra.cql3.QueryOptions; -import org.apache.cassandra.service.QueryState; - -/** - * Contextual information about the execution of a CQLStatement. - * Used by {@link org.apache.cassandra.cql3.hooks.PreExecutionHook} and - * {@link org.apache.cassandra.cql3.hooks.PostExecutionHook} - * - * The CQL string representing the statement being executed is optional - * and is not present for prepared statements. Contexts created for the - * execution of regular (i.e. non-prepared) statements will always - * contain a CQL string. - */ -public class ExecutionContext -{ - public final QueryState queryState; - public final Optional<String> queryString; - public final QueryOptions queryOptions; - - public ExecutionContext(QueryState queryState, String queryString, QueryOptions queryOptions) - { - this.queryState = queryState; - this.queryString = Optional.fromNullable(queryString); - this.queryOptions = queryOptions; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/cql3/hooks/PostExecutionHook.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/hooks/PostExecutionHook.java b/src/java/org/apache/cassandra/cql3/hooks/PostExecutionHook.java deleted file mode 100644 index 96c742f..0000000 --- a/src/java/org/apache/cassandra/cql3/hooks/PostExecutionHook.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.cql3.hooks; - -import org.apache.cassandra.cql3.CQLStatement; -import org.apache.cassandra.cql3.statements.BatchStatement; -import org.apache.cassandra.exceptions.RequestExecutionException; -import org.apache.cassandra.exceptions.RequestValidationException; - -/** - * Run after the CQL Statement is executed in - * {@link org.apache.cassandra.cql3.QueryProcessor}. - */ -public interface PostExecutionHook -{ - /** - * Perform post-processing on a CQL statement directly after - * it being executed by the QueryProcessor. - * - * @param statement the statement to perform post-processing on - * @param context execution context containing additional info - * about the operation and statement - * @throws RequestExecutionException, RequestValidationException - */ - void processStatement(CQLStatement statement, ExecutionContext context) throws RequestExecutionException, RequestValidationException; - - /** - * Perform post-processing on a CQL batch directly after - * it being executed by the QueryProcessor. - * - * @param batch the CQL batch to perform post-processing on - * @param context execution context containing additional info - * about the operation and batch - * @throws RequestExecutionException, RequestValidationException - */ - void processBatch(BatchStatement batch, BatchExecutionContext context) throws RequestExecutionException, RequestValidationException; -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/cql3/hooks/PostPreparationHook.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/hooks/PostPreparationHook.java b/src/java/org/apache/cassandra/cql3/hooks/PostPreparationHook.java deleted file mode 100644 index c2cf88a..0000000 --- a/src/java/org/apache/cassandra/cql3/hooks/PostPreparationHook.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.cql3.hooks; - -import org.apache.cassandra.cql3.CQLStatement; -import org.apache.cassandra.exceptions.RequestValidationException; - -/** - * Run directly after a CQL Statement is prepared in - * {@link org.apache.cassandra.cql3.QueryProcessor}. - */ -public interface PostPreparationHook -{ - /** - * Called in QueryProcessor, once a CQL statement has been prepared. - * - * @param statement the statement to perform additional processing on - * @param context preparation context containing additional info - * about the operation and statement - * @throws RequestValidationException - */ - void processStatement(CQLStatement statement, PreparationContext context) throws RequestValidationException; -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/cql3/hooks/PreExecutionHook.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/hooks/PreExecutionHook.java b/src/java/org/apache/cassandra/cql3/hooks/PreExecutionHook.java deleted file mode 100644 index 3a8182f..0000000 --- a/src/java/org/apache/cassandra/cql3/hooks/PreExecutionHook.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.cql3.hooks; - -import org.apache.cassandra.cql3.CQLStatement; -import org.apache.cassandra.cql3.statements.BatchStatement; -import org.apache.cassandra.exceptions.RequestExecutionException; -import org.apache.cassandra.exceptions.RequestValidationException; - -/** - * Run before the CQL Statement is executed in - * {@link org.apache.cassandra.cql3.QueryProcessor}. The CQLStatement - * returned from the process* methods is what is actually executed - * by the QueryProcessor. - */ -public interface PreExecutionHook -{ - /** - * Perform pre-processing on a CQL statement prior to it being - * executed by the QueryProcessor. If required, implementations - * may modify the statement as the returned instance is what - * is actually executed. - * - * @param statement the statement to perform pre-processing on - * @param context execution context containing additional info - * about the operation and statement - * @return the actual statement that will be executed, potentially - * a modification of the initial statement - * @throws RequestExecutionException, RequestValidationException - */ - CQLStatement processStatement(CQLStatement statement, ExecutionContext context) throws RequestExecutionException, RequestValidationException; - - /** - * Perform pre-processing on a CQL batch prior to it being - * executed by the QueryProcessor. If required, implementations - * may modify the batch & its component statements as the returned - * instance is what is actually executed. - * - * @param batch the CQL batch to perform pre-processing on - * @param context execution context containing additional info - * about the operation and batch - * @return the actual batch that will be executed, potentially - * a modification of the initial batch - * @throws RequestExecutionException, RequestValidationException - */ - BatchStatement processBatch(BatchStatement batch, BatchExecutionContext context) throws RequestExecutionException, RequestValidationException; -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/cql3/hooks/PreparationContext.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/hooks/PreparationContext.java b/src/java/org/apache/cassandra/cql3/hooks/PreparationContext.java deleted file mode 100644 index fda0b7d..0000000 --- a/src/java/org/apache/cassandra/cql3/hooks/PreparationContext.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.cql3.hooks; - -import java.util.List; - -import org.apache.cassandra.cql3.ColumnSpecification; -import org.apache.cassandra.service.ClientState; - -/** - * Contextual information about the preparation of a CQLStatement. - * Used by {@link org.apache.cassandra.cql3.hooks.PostPreparationHook} - */ -public class PreparationContext -{ - public final ClientState clientState; - public final String queryString; - public final List<ColumnSpecification> boundNames; - - public PreparationContext(ClientState clientState, String queryString, List<ColumnSpecification> boundNames) - { - this.clientState = clientState; - this.queryString = queryString; - this.boundNames = boundNames; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java index 561bf1c..8acbcab 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java @@ -44,4 +44,4 @@ public class ListUsersStatement extends AuthenticationStatement ConsistencyLevel.QUORUM, QueryState.forInternalCalls()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/service/ClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java index 7f312a9..a58027e 100644 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@ -27,10 +27,14 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.*; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.QueryHandler; +import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.AuthenticationException; @@ -38,6 +42,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.UnauthorizedException; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.thrift.ThriftValidation; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.SemanticVersion; @@ -46,6 +51,7 @@ import org.apache.cassandra.utils.SemanticVersion; */ public class ClientState { + private static final Logger logger = LoggerFactory.getLogger(ClientState.class); public static final SemanticVersion DEFAULT_CQL_VERSION = org.apache.cassandra.cql3.QueryProcessor.CQL_VERSION; private static final Set<IResource> READABLE_SYSTEM_RESOURCES = new HashSet<>(); @@ -74,6 +80,25 @@ public class ClientState private volatile String keyspace; private SemanticVersion cqlVersion; + private static final QueryHandler cqlQueryHandler; + static + { + QueryHandler handler = QueryProcessor.instance; + String customHandlerClass = System.getProperty("cassandra.custom_query_handler_class"); + if (customHandlerClass != null) + { + try + { + handler = (QueryHandler)FBUtilities.construct(customHandlerClass, "QueryHandler"); + logger.info("Using {} as query handler for native protocol queries (as requested with -Dcassandra.custom_query_handler_class)", customHandlerClass); + } + catch (Exception e) + { + logger.info("Cannot use class {} as query handler ({}), ignoring by defaulting on normal query handling", customHandlerClass, e.getMessage()); + } + } + cqlQueryHandler = handler; + } // isInternal is used to mark ClientState as used by some internal component // that should have an ability to modify system keyspace. @@ -115,6 +140,11 @@ public class ClientState return new ClientState(remoteAddress); } + public static QueryHandler getCQLQueryHandler() + { + return cqlQueryHandler; + } + public SocketAddress getRemoteAddress() { return remoteAddress; http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index ef5eeb8..8a8c40e 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -1956,7 +1956,7 @@ public class CassandraServer implements Cassandra.Iface } ThriftClientState cState = state(); - return org.apache.cassandra.cql3.QueryProcessor.process(queryString, ThriftConversion.fromThrift(cLevel), cState.getQueryState()).toThriftResult(); + return cState.getCQLQueryHandler().process(queryString, cState.getQueryState(), new QueryOptions(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList())).toThriftResult(); } catch (RequestExecutionException e) { @@ -2009,7 +2009,7 @@ public class CassandraServer implements Cassandra.Iface try { cState.validateLogin(); - return org.apache.cassandra.cql3.QueryProcessor.prepare(queryString, cState, true).toThriftPreparedResult(); + return cState.getCQLQueryHandler().prepare(queryString, cState.getQueryState()).toThriftPreparedResult(); } catch (RequestValidationException e) { @@ -2076,7 +2076,7 @@ public class CassandraServer implements Cassandra.Iface try { ThriftClientState cState = state(); - org.apache.cassandra.cql3.CQLStatement statement = org.apache.cassandra.cql3.QueryProcessor.getPrepared(itemId); + org.apache.cassandra.cql3.CQLStatement statement = cState.getCQLQueryHandler().getPreparedForThrift(itemId); if (statement == null) throw new InvalidRequestException(String.format("Prepared query with ID %d not found" + @@ -2085,9 +2085,9 @@ public class CassandraServer implements Cassandra.Iface itemId)); logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundTerms()); - return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, - cState.getQueryState(), - new QueryOptions(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult(); + return cState.getCQLQueryHandler().processPrepared(statement, + cState.getQueryState(), + new QueryOptions(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult(); } catch (RequestExecutionException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/transport/messages/BatchMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java index 487e089..221dcd9 100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -25,9 +25,7 @@ import java.util.UUID; import org.jboss.netty.buffer.ChannelBuffer; -import org.apache.cassandra.cql3.Attributes; -import org.apache.cassandra.cql3.CQLStatement; -import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.*; import org.apache.cassandra.cql3.statements.BatchStatement; import org.apache.cassandra.cql3.statements.ModificationStatement; import org.apache.cassandra.db.ConsistencyLevel; @@ -162,6 +160,7 @@ public class BatchMessage extends Message.Request Tracing.instance.begin("Execute batch of CQL3 queries", Collections.<String, String>emptyMap()); } + QueryHandler handler = state.getClientState().getCQLQueryHandler(); List<ModificationStatement> statements = new ArrayList<ModificationStatement>(queryOrIdList.size()); for (int i = 0; i < queryOrIdList.size(); i++) { @@ -173,7 +172,7 @@ public class BatchMessage extends Message.Request } else { - statement = QueryProcessor.getPrepared((MD5Digest)query); + statement = handler.getPrepared((MD5Digest)query); if (statement == null) throw new PreparedQueryNotFoundException((MD5Digest)query); } @@ -203,7 +202,7 @@ public class BatchMessage extends Message.Request // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor // (and no value would be really correct, so we prefer passing a clearly wrong one). BatchStatement batch = new BatchStatement(-1, type, statements, Attributes.none()); - Message.Response response = QueryProcessor.processBatch(batch, consistency, state, values, queryOrIdList); + Message.Response response = handler.processBatch(batch, state, new BatchQueryOptions(consistency, values, queryOrIdList)); if (tracingId != null) response.setTracingId(tracingId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index c090f9f..0a2b26d 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -27,7 +27,7 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.apache.cassandra.cql3.CQLStatement; -import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.QueryHandler; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.exceptions.PreparedQueryNotFoundException; @@ -101,7 +101,8 @@ public class ExecuteMessage extends Message.Request { try { - CQLStatement statement = QueryProcessor.getPrepared(statementId); + QueryHandler handler = state.getClientState().getCQLQueryHandler(); + CQLStatement statement = handler.getPrepared(statementId); if (statement == null) throw new PreparedQueryNotFoundException(statementId); @@ -128,7 +129,7 @@ public class ExecuteMessage extends Message.Request Tracing.instance.begin("Execute CQL3 prepared query", builder.build()); } - Message.Response response = QueryProcessor.processPrepared(statement, state, options); + Message.Response response = handler.processPrepared(statement, state, options); if (options.skipMetadata() && response instanceof ResultMessage.Rows) ((ResultMessage.Rows)response).result.metadata.setSkipMetadata(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java index 002c33c..4b00f19 100644 --- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java @@ -74,7 +74,7 @@ public class PrepareMessage extends Message.Request Tracing.instance.begin("Preparing CQL3 query", ImmutableMap.of("query", query)); } - Message.Response response = QueryProcessor.prepare(query, state.getClientState(), false); + Message.Response response = state.getClientState().getCQLQueryHandler().prepare(query, state); if (tracingId != null) response.setTracingId(tracingId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/485feefb/src/java/org/apache/cassandra/transport/messages/QueryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index 744e0ea..b0a48e7 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableMap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; -import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.QueryHandler; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.exceptions.*; @@ -116,7 +116,7 @@ public class QueryMessage extends Message.Request Tracing.instance.begin("Execute CQL3 query", builder.build()); } - Message.Response response = QueryProcessor.process(query, state, options); + Message.Response response = state.getClientState().getCQLQueryHandler().process(query, state, options); if (options.skipMetadata() && response instanceof ResultMessage.Rows) ((ResultMessage.Rows)response).result.metadata.setSkipMetadata();