Author: jbellis Date: Fri Jun 10 23:24:36 2011 New Revision: 1134475 URL: http://svn.apache.org/viewvc?rev=1134475&view=rev Log: QueryProcessor handles wait-for-schema-agreement patch by pyaskevich; reviewed by jbellis for CASSANDRA-2756
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java Modified: cassandra/branches/cassandra-0.8/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1134475&r1=1134474&r2=1134475&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Jun 10 23:24:36 2011 @@ -9,6 +9,7 @@ - ALTER COLUMNFAMILY (CASSANDRA-1709) - DROP INDEX (CASSANDRA-2617) - add SCHEMA/TABLE as aliases for KS/CF (CASSANDRA-2743) + - server handles wait-for-schema-agreement (CASSANDRA-2756) * add support for comparator parameters and a generic ReverseType (CASSANDRA-2355) * add CompositeType and DynamicCompositeType (CASSANDRA-2231) Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1134475&r1=1134474&r2=1134475&view=diff ============================================================================== --- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java (original) +++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java Fri Jun 10 23:24:36 2011 @@ -63,6 +63,8 @@ public class QueryProcessor { private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class); + private static final long timeLimitForSchemaAgreement = 10 * 1000; + private static List<org.apache.cassandra.db.Row> getSlice(String keyspace, SelectStatement select) throws InvalidRequestException, TimedOutException, UnavailableException { @@ -343,9 +345,9 @@ public class QueryProcessor throw new InvalidRequestException("No indexed columns present in by-columns clause with \"equals\" operator"); } } - + // Copypasta from o.a.c.thrift.CassandraDaemon - private static void applyMigrationOnStage(final Migration m) throws InvalidRequestException + private static void applyMigrationOnStage(final Migration m) throws SchemaDisagreementException, InvalidRequestException { Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new Callable<Object>() { @@ -380,6 +382,8 @@ public class QueryProcessor throw ex; } } + + validateSchemaIsSettled(); } public static void validateKey(ByteBuffer key) throws InvalidRequestException @@ -463,13 +467,17 @@ public class QueryProcessor // Copypasta from CassandraServer (where it is private). private static void validateSchemaAgreement() throws SchemaDisagreementException { - // unreachable hosts don't count towards disagreement - Map<String, List<String>> versions = Maps.filterKeys(StorageProxy.describeSchemaVersions(), - Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE))); - if (versions.size() > 1) + if (describeSchemaVersions().size() > 1) throw new SchemaDisagreementException(); } + private static Map<String, List<String>> describeSchemaVersions() + { + // unreachable hosts don't count towards disagreement + return Maps.filterKeys(StorageProxy.describeSchemaVersions(), + Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE))); + } + public static CqlResult process(String queryString, ClientState clientState) throws RecognitionException, UnavailableException, InvalidRequestException, TimedOutException, SchemaDisagreementException { @@ -940,4 +948,25 @@ public class QueryProcessor return statement; } + + private static void validateSchemaIsSettled() throws SchemaDisagreementException + { + long limit = System.currentTimeMillis() + timeLimitForSchemaAgreement; + + outer: + while (limit - System.currentTimeMillis() >= 0) + { + String currentVersionId = DatabaseDescriptor.getDefsVersion().toString(); + for (String version : describeSchemaVersions().keySet()) + { + if (!version.equals(currentVersionId)) + continue outer; + } + + // schemas agree + return; + } + + throw new SchemaDisagreementException(); + } }