Author: [email protected]
Date: Wed Jun  6 11:19:06 2012
New Revision: 2436

Log:
[AMDATUCASSANDRA-189] Fixed network traffic issue by verifying if the schema 
version has changed, before checking if any CF or keyspace has been added or 
dropped

Modified:
   
branches/amdatu-cassandra-0.2.4/cassandra-client/src/main/java/org/amdatu/cassandra/client/CassandraClientService.java
   
branches/amdatu-cassandra-0.2.4/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
   
branches/amdatu-cassandra-0.2.4/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java

Modified: 
branches/amdatu-cassandra-0.2.4/cassandra-client/src/main/java/org/amdatu/cassandra/client/CassandraClientService.java
==============================================================================
--- 
branches/amdatu-cassandra-0.2.4/cassandra-client/src/main/java/org/amdatu/cassandra/client/CassandraClientService.java
      (original)
+++ 
branches/amdatu-cassandra-0.2.4/cassandra-client/src/main/java/org/amdatu/cassandra/client/CassandraClientService.java
      Wed Jun  6 11:19:06 2012
@@ -16,6 +16,7 @@
 package org.amdatu.cassandra.client;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.cassandra.thrift.CfDef;
 
@@ -51,6 +52,23 @@
     String EVENT_ADMIN_KEYSPACE_DROPPED = "keyspace_dropped";
 
     /**
+     * Describes the schema versions of all hosts in the cluster. The key in 
the map is a UUID representing
+     * the schema versions and its value is a list of the IP addresses of all 
hosts with this schema version. If the
+     * map contains more then 1 key, there is no schema agreement across the 
cluster (this typically happens 
+     * immediately after adding a tenant where the cluster is processing the 
keyspace/CF changes).
+     * @return Map of schema version onto the hosts with this version
+     */
+    Map<String, List<String>> describeSchemaVersions();
+    
+    /**
+     * Block the current thread until schema agreement has been reached 
throughout the cluster, with a 
+     * configurable timeout (see schema_agreement_timeout setting in 
org.amdatu.core.cassandra.client.cfg).
+     * Default timeout is 30 seconds. If no schema agreement is reached within 
the timeout, a WARNING 
+     * message is logged.
+     */
+    void waitForSchemaAgreement();
+
+    /**
      * Returns a list of all available keyspaces. Note that keyspace names are 
case-sensitive
      * in Cassandra.
      * 

Modified: 
branches/amdatu-cassandra-0.2.4/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
==============================================================================
--- 
branches/amdatu-cassandra-0.2.4/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
  (original)
+++ 
branches/amdatu-cassandra-0.2.4/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
  Wed Jun  6 11:19:06 2012
@@ -150,6 +150,10 @@
         return CLUSTER;
     }
 
+    public synchronized Map<String, List<String>> describeSchemaVersions() {
+               return getHectorCluster().describeSchemaVersions();
+    }
+
     public synchronized boolean keyspaceExists(final String keyspaceName) {
         List<String> ksNames = getKeyspaces();
         for (String ksName : ksNames) {
@@ -175,7 +179,7 @@
         }
     }
 
-    private void waitForSchemaAgreement() {
+    public void waitForSchemaAgreement() {
         String agreed = checkSchemaAgreement();
         long expires = System.currentTimeMillis() + 1000 * 
m_configuration.get(SCHEMA_AGREEMENT_TIMEOUT, Integer.class);
         boolean pastAgreement = (agreed != null);

Modified: 
branches/amdatu-cassandra-0.2.4/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
==============================================================================
--- 
branches/amdatu-cassandra-0.2.4/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
     (original)
+++ 
branches/amdatu-cassandra-0.2.4/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
     Wed Jun  6 11:19:06 2012
@@ -55,6 +55,7 @@
  */
 public class CassandraUpdateListenerImpl {
     private static final String SYSTEM_TABLE = "system";
+    private static final String UNREACHABLE = "UNREACHABLE";
 
     // Services injected by the dependency manager
     private volatile LogService m_logService;
@@ -68,6 +69,9 @@
 
     // The interval for each individual inspect
     private static int INSPECT_INTERVAL = 5000;
+    
+    // The current schema version
+    private String m_schemaVersion = null;
 
     private Map<String, Component> m_componentMap = new 
ConcurrentHashMap<String, Component>();
 
@@ -126,19 +130,23 @@
                 while (!isInterrupted()) {
                     // Inspect available keyspaces
                     try {
-                        // Only compare keyspaces with CPM's if there was a 
keyspace change in the meantime
-                        // We make a snapshot before we start checking if any 
updates are needed and performing them.
-                        // Updates are needed when the snapshot taken now 
differs from the one taken previously
-                        Map<String, List<String>> newSnapshot = getSnapshot();
-                        if (!m_snapshot.equals(newSnapshot)) {
-                            m_snapshot = newSnapshot;
-                            onKeyspaceAdded();
-                            onKeyspaceDropped();
-                            onColumnFamilyAdded();
-                            onColumnFamilyRemoved();
-                        }
-                        else {
-                            m_snapshot = newSnapshot;
+                        // First of all we verify that the schema version 
(agreed upon by the whole cluster)
+                        // has been changed compared to the last time we 
checked it.
+                        if (isSchemaUpdated()) {
+                            // We make snapshots with each schema change so 
that we can determine what keyspaces and/or
+                            // CFs have been added or removed. Is the snapshot 
has not been changed, the schema change 
+                            // is not related to added or removed keyspaces or 
CFs, and so we can skip the update.
+                            Map<String, List<String>> newSnapshot = 
getSnapshot();
+                            if (!m_snapshot.equals(newSnapshot)) {
+                                m_snapshot = newSnapshot;
+                                onKeyspaceAdded();
+                                onKeyspaceDropped();
+                                onColumnFamilyAdded();
+                                onColumnFamilyRemoved();
+                            }
+                            else {
+                                m_snapshot = newSnapshot;
+                            }
                         }
                     }
                     catch (Exception e) {
@@ -147,13 +155,58 @@
                                 + e.getMessage(), e);
                     }
 
-                    Thread.sleep(INSPECT_INTERVAL);
+                     Thread.sleep(INSPECT_INTERVAL);
                 }
             }
             catch (InterruptedException e) {
                 m_logService.log(LogService.LOG_INFO, "Cassandra update 
listener thread interrupted.");
             }
         }
+        
+        private Set<String> getSchemaVersions() {
+            // Retrieve the schema versions
+            Map<String, List<String>> schemaVersions = 
m_daemonService.describeSchemaVersions();
+            
+            // Ignore schema versions for UNREACHEABLE nodes (unreachable 
nodes have the UNREACHABLE version)
+            schemaVersions.remove(UNREACHABLE);
+            
+            // Return the versions
+            return schemaVersions.keySet();
+        }
+
+        private boolean isSchemaUpdated() {
+            // First determine the new current schema version
+            Set<String> versions = getSchemaVersions();
+            if (versions.size() == 0) {
+                // No schema, no update
+                return false;
+            }
+            else if (versions.size() > 1) {
+                // No schema agreement yet. We first wait until schema 
migration has been reached.
+                // If this is not completed within the timeout, we handle it 
as an update of the schema.
+                m_daemonService.waitForSchemaAgreement(); 
+                
+                // Now try again
+                versions = getSchemaVersions();
+                
+                if (versions.size() != 1) {
+                    // Still no agreement, consider this as an update of the 
schema
+                    return true;
+                }
+            }
+            String newSchemaVersion = versions.iterator().next();
+
+            if (m_schemaVersion == null || 
!m_schemaVersion.equals(newSchemaVersion)) {
+                // We did not yet memorize our schema version or the schema 
version has changed. 
+                // So memorize the new schema version and return true, 
indicating that the schema
+                // has changed.
+                m_schemaVersion = newSchemaVersion;
+                return true;
+            }
+            
+            // No schema change detected
+            return false;
+        }
 
         // Creates and returns a snapshot of the currently available keyspaces 
and column families
         private Map<String, List<String>> getSnapshot() throws TException, 
InvalidRequestException, NotFoundException {
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits

Reply via email to