Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 109313c3e -> 90a211455
Invalidate prepared stmts when ks or table is dropped Patch by Viju Kothuvatiparambil; review by Tyler Hobbs for CASSANDRA-7566 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/90a21145 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/90a21145 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/90a21145 Branch: refs/heads/cassandra-2.1 Commit: 90a2114551bf2052e2abccbc04c654ceec74c2d3 Parents: 109313c Author: Viju Kothuvatiparambil <viju.kothuv...@mindmax.us> Authored: Tue Sep 16 17:46:53 2014 -0500 Committer: Tyler Hobbs <ty...@datastax.com> Committed: Tue Sep 16 17:46:53 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/cql3/QueryProcessor.java | 67 +++++++++++++++ .../cql3/PreparedStatementCleanupTest.java | 86 ++++++++++++++++++++ 3 files changed, 155 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/90a21145/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1764a20..ca578f3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.1.1 + * Invalidate prepared statements when their keyspace or table is + dropped (CASSANDRA-7566) * cassandra-stress: fix support for NetworkTopologyStrategy (CASSANDRA-7945) * Fix saving caches when a table is dropped (CASSANDRA-7784) * Add better error checking of new stress profile (CASSANDRA-7716) http://git-wip-us.apache.org/repos/asf/cassandra/blob/90a21145/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index a5be108..efd1ebb 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -29,6 +29,8 @@ import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import com.googlecode.concurrentlinkedhashmap.EntryWeigher; import com.googlecode.concurrentlinkedhashmap.EvictionListener; import org.antlr.runtime.*; +import org.apache.cassandra.service.IMigrationListener; +import org.apache.cassandra.service.MigrationManager; import org.github.jamm.MemoryMeter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,6 +148,7 @@ public class QueryProcessor implements QueryHandler private QueryProcessor() { + MigrationManager.instance.register(new MigrationSubscriber()); } public ParsedStatement.Prepared getPrepared(MD5Digest id) @@ -508,4 +511,68 @@ public class QueryProcessor implements QueryHandler ? ((MeasurableForPreparedCache)key).measureForPreparedCache(meter) : meter.measureDeep(key); } + + private static class MigrationSubscriber implements IMigrationListener + { + private void removeInvalidPreparedStatements(String ksName, String cfName) + { + Iterator<ParsedStatement.Prepared> iterator = preparedStatements.values().iterator(); + while (iterator.hasNext()) + { + if (shouldInvalidate(ksName, cfName, iterator.next().statement)) + iterator.remove(); + } + + Iterator<CQLStatement> thriftIterator = thriftPreparedStatements.values().iterator(); + while (thriftIterator.hasNext()) + { + if (shouldInvalidate(ksName, cfName, thriftIterator.next())) + thriftIterator.remove(); + } + } + + private boolean shouldInvalidate(String ksName, String cfName, CQLStatement statement) + { + String statementKsName; + String statementCfName; + + if (statement instanceof ModificationStatement) + { + ModificationStatement modificationStatement = ((ModificationStatement) statement); + statementKsName = modificationStatement.keyspace(); + statementCfName = modificationStatement.columnFamily(); + } + else if (statement instanceof SelectStatement) + { + SelectStatement selectStatement = ((SelectStatement) statement); + statementKsName = selectStatement.keyspace(); + statementCfName = selectStatement.columnFamily(); + } + else + { + return false; + } + + return ksName.equals(statementKsName) && (cfName == null || cfName.equals(statementCfName)); + } + + public void onCreateKeyspace(String ksName) { } + public void onCreateColumnFamily(String ksName, String cfName) { } + public void onCreateUserType(String ksName, String typeName) { } + public void onUpdateKeyspace(String ksName) { } + public void onUpdateColumnFamily(String ksName, String cfName) { } + public void onUpdateUserType(String ksName, String typeName) { } + + public void onDropKeyspace(String ksName) + { + removeInvalidPreparedStatements(ksName, null); + } + + public void onDropColumnFamily(String ksName, String cfName) + { + removeInvalidPreparedStatements(ksName, cfName); + } + + public void onDropUserType(String ksName, String typeName) { } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/90a21145/test/unit/org/apache/cassandra/cql3/PreparedStatementCleanupTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementCleanupTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementCleanupTest.java new file mode 100644 index 0000000..3e725e9 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementCleanupTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class PreparedStatementCleanupTest extends SchemaLoader +{ + private static Cluster cluster; + private static Session session; + + private static final String KEYSPACE = "prepared_stmt_cleanup"; + private static final String createKsStatement = "CREATE KEYSPACE " + KEYSPACE + + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"; + private static final String dropKsStatement = "DROP KEYSPACE IF EXISTS " + KEYSPACE; + + @BeforeClass + public static void setup() throws Exception + { + Schema.instance.clear(); + + EmbeddedCassandraService cassandra = new EmbeddedCassandraService(); + cassandra.start(); + + // Currently the native server start method return before the server is fully binded to the socket, so we need + // to wait slightly before trying to connect to it. We should fix this but in the meantime using a sleep. + Thread.sleep(500); + + cluster = Cluster.builder().addContactPoint("127.0.0.1") + .withPort(DatabaseDescriptor.getNativeTransportPort()) + .build(); + session = cluster.connect(); + + session.execute(dropKsStatement); + session.execute(createKsStatement); + } + + @AfterClass + public static void tearDown() throws Exception + { + cluster.close(); + } + + @Test + public void testInvalidatePreparedStatementsOnDrop() + { + String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (id int PRIMARY KEY, cid int, val text);"; + String dropTableStatement = "DROP TABLE IF EXISTS " + KEYSPACE + ".qp_cleanup;"; + + session.execute(createTableStatement); + PreparedStatement prepared = session.prepare("INSERT INTO " + KEYSPACE + ".qp_cleanup (id, cid, val) VALUES (?, ?, ?)"); + session.execute(dropTableStatement); + session.execute(createTableStatement); + session.execute(prepared.bind(1, 1, "value")); + + session.execute(dropKsStatement); + session.execute(createKsStatement); + session.execute(createTableStatement); + session.execute(prepared.bind(1, 1, "value")); + session.execute(dropKsStatement); + } +}