Author: jbellis
Date: Fri Jun 10 23:24:36 2011
New Revision: 1134475

URL: http://svn.apache.org/viewvc?rev=1134475&view=rev
Log:
QueryProcessor handles wait-for-schema-agreement
patch by pyaskevich; reviewed by jbellis for CASSANDRA-2756

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1134475&r1=1134474&r2=1134475&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Fri Jun 10 23:24:36 2011
@@ -9,6 +9,7 @@
    - ALTER COLUMNFAMILY (CASSANDRA-1709)
    - DROP INDEX (CASSANDRA-2617)
    - add SCHEMA/TABLE as aliases for KS/CF (CASSANDRA-2743)
+   - server handles wait-for-schema-agreement (CASSANDRA-2756)
  * add support for comparator parameters and a generic ReverseType
    (CASSANDRA-2355)
  * add CompositeType and DynamicCompositeType (CASSANDRA-2231)

Modified: 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1134475&r1=1134474&r2=1134475&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/cql/QueryProcessor.java
 Fri Jun 10 23:24:36 2011
@@ -63,6 +63,8 @@ public class QueryProcessor
 {
     private static final Logger logger = 
LoggerFactory.getLogger(QueryProcessor.class);
 
+    private static final long timeLimitForSchemaAgreement = 10 * 1000;
+
     private static List<org.apache.cassandra.db.Row> getSlice(String keyspace, 
SelectStatement select)
     throws InvalidRequestException, TimedOutException, UnavailableException
     {
@@ -343,9 +345,9 @@ public class QueryProcessor
             throw new InvalidRequestException("No indexed columns present in 
by-columns clause with \"equals\" operator");
         }
     }
-    
+
     // Copypasta from o.a.c.thrift.CassandraDaemon
-    private static void applyMigrationOnStage(final Migration m) throws 
InvalidRequestException
+    private static void applyMigrationOnStage(final Migration m) throws 
SchemaDisagreementException, InvalidRequestException
     {
         Future<?> f = StageManager.getStage(Stage.MIGRATION).submit(new 
Callable<Object>()
         {
@@ -380,6 +382,8 @@ public class QueryProcessor
                 throw ex;
             }
         }
+
+        validateSchemaIsSettled();
     }
     
     public static void validateKey(ByteBuffer key) throws 
InvalidRequestException
@@ -463,13 +467,17 @@ public class QueryProcessor
     // Copypasta from CassandraServer (where it is private).
     private static void validateSchemaAgreement() throws 
SchemaDisagreementException
     {
-        // unreachable hosts don't count towards disagreement
-        Map<String, List<String>> versions = 
Maps.filterKeys(StorageProxy.describeSchemaVersions(),
-                                                             
Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE)));
-        if (versions.size() > 1)
+       if (describeSchemaVersions().size() > 1)
             throw new SchemaDisagreementException();
     }
 
+    private static Map<String, List<String>> describeSchemaVersions()
+    {
+        // unreachable hosts don't count towards disagreement
+        return Maps.filterKeys(StorageProxy.describeSchemaVersions(),
+                               
Predicates.not(Predicates.equalTo(StorageProxy.UNREACHABLE)));
+    }
+
     public static CqlResult process(String queryString, ClientState 
clientState)
     throws RecognitionException, UnavailableException, 
InvalidRequestException, TimedOutException, SchemaDisagreementException
     {
@@ -940,4 +948,25 @@ public class QueryProcessor
         
         return statement;
     }
+
+    private static void validateSchemaIsSettled() throws 
SchemaDisagreementException
+    {
+        long limit = System.currentTimeMillis() + timeLimitForSchemaAgreement;
+
+        outer:
+        while (limit - System.currentTimeMillis() >= 0)
+        {
+            String currentVersionId = 
DatabaseDescriptor.getDefsVersion().toString();
+            for (String version : describeSchemaVersions().keySet())
+            {
+                if (!version.equals(currentVersionId))
+                    continue outer;
+            }
+
+            // schemas agree
+            return;
+        }
+
+        throw new SchemaDisagreementException();
+    }
 }


Reply via email to