Author: [email protected]
Date: Tue May 29 12:39:46 2012
New Revision: 2409
Log:
[AMDATUCASSANDRA-189] Applied patch attached to the issue
Modified:
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/CassandraClientService.java
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/CassandraClientService.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/CassandraClientService.java
(original)
+++
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/CassandraClientService.java
Tue May 29 12:39:46 2012
@@ -16,6 +16,7 @@
package org.amdatu.cassandra.client;
import java.util.List;
+import java.util.Map;
import org.apache.cassandra.thrift.CfDef;
@@ -51,6 +52,23 @@
String EVENT_ADMIN_KEYSPACE_DROPPED = "keyspace_dropped";
/**
+ * Describes the schema versions of all hosts in the cluster. The key in
the map is a UUID representing
+ * the schema versions and its value is a list of the IP addresses of all
hosts with this schema version. If the
+ * map contains more then 1 key, there is no schema agreement across the
cluster (this typically happens
+ * immediately after adding a tenant where the cluster is processing the
keyspace/CF changes).
+ * @return Map of schema version onto the hosts with this version
+ */
+ Map<String, List<String>> describeSchemaVersions();
+
+ /**
+ * Block the current thread until schema agreement has been reached
throughout the cluster, with a
+ * configurable timeout (see schema_agreement_timeout setting in
org.amdatu.core.cassandra.client.cfg).
+ * Default timeout is 30 seconds. If no schema agreement is reached within
the timeout, a WARNING
+ * message is logged.
+ */
+ void waitForSchemaAgreement();
+
+ /**
* Returns a list of all available keyspaces. Note that keyspace names are
case-sensitive
* in Cassandra.
*
Modified:
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
Tue May 29 12:39:46 2012
@@ -150,6 +150,10 @@
return CLUSTER;
}
+ public Map<String, List<String>> describeSchemaVersions() {
+ return getHectorCluster().describeSchemaVersions();
+ }
+
public synchronized boolean keyspaceExists(final String keyspaceName) {
List<String> ksNames = getKeyspaces();
for (String ksName : ksNames) {
@@ -175,7 +179,7 @@
}
}
- private void waitForSchemaAgreement() {
+ public void waitForSchemaAgreement() {
String agreed = checkSchemaAgreement();
long expires = System.currentTimeMillis() + 1000 *
m_configuration.get(SCHEMA_AGREEMENT_TIMEOUT, Integer.class);
boolean pastAgreement = (agreed != null);
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
Tue May 29 12:39:46 2012
@@ -127,19 +127,23 @@
while (!isInterrupted()) {
// Inspect available keyspaces
try {
- // Only compare keyspaces with CPM's if there was a
keyspace change in the meantime
- // We make a snapshot before we start checking if any
updates are needed and performing them.
- // Updates are needed when the snapshot taken now
differs from the one taken previously
- Map<String, List<String>> newSnapshot = getSnapshot();
- if (!m_snapshot.equals(newSnapshot)) {
- m_snapshot = newSnapshot;
- onKeyspaceAdded();
- onKeyspaceDropped();
- onColumnFamilyAdded();
- onColumnFamilyRemoved();
- }
- else {
- m_snapshot = newSnapshot;
+ // First of all we verify that the schema version
(agreed upon by the whole cluster)
+ // has been changed compared to the last time we
checked it.
+ if (isSchemaUpdated()) {
+ // We make snapshots with each schema change so
that we can determine what keyspaces and/or
+ // CFs have been added or removed. Is the snapshot
has not been changed, the schema change
+ // is not related to added or removed keyspaces or
CFs, and so we can skip the update.
+ Map<String, List<String>> newSnapshot =
getSnapshot();
+ if (!m_snapshot.equals(newSnapshot)) {
+ m_snapshot = newSnapshot;
+ onKeyspaceAdded();
+ onKeyspaceDropped();
+ onColumnFamilyAdded();
+ onColumnFamilyRemoved();
+ }
+ else {
+ m_snapshot = newSnapshot;
+ }
}
}
catch (Exception e) {
@@ -148,7 +152,7 @@
+ e.getMessage(), e);
}
- Thread.sleep(INSPECT_INTERVAL);
+ Thread.sleep(INSPECT_INTERVAL);
}
}
catch (InterruptedException e) {
@@ -156,6 +160,42 @@
}
}
+ private String m_schemaVersion = null;
+
+ private boolean isSchemaUpdated() {
+ // First determine the new current schema version
+ Map<String, List<String>> schemaVersions =
m_daemonService.describeSchemaVersions();
+ Set<String> versions = schemaVersions.keySet();
+ if (versions.size() == 0) {
+ // No schema, no update
+ return false;
+ }
+ else if (versions.size() > 1) {
+ // No schema agreement yet. We first wait until schema
migration has been reached.
+ // If this is not completed within the timeout, we handle it
as an update of the schema.
+ m_daemonService.waitForSchemaAgreement();
+
+ // Now try again
+ versions = schemaVersions.keySet();
+ if (versions.size() != 1) {
+ // Still no agreement, consider this as an update of the
schema
+ return true;
+ }
+ }
+ String newSchemaVersion = versions.iterator().next();
+
+ if (m_schemaVersion == null ||
!m_schemaVersion.equals(newSchemaVersion)) {
+ // We did not yet memorize our schema version or the schema
version has changed.
+ // So memorize the new schema version and return true,
indicating that the schema
+ // has changed.
+ m_schemaVersion = newSchemaVersion;
+ return true;
+ }
+
+ // No schema change detected
+ return false;
+ }
+
// Creates and returns a snapshot of the currently available keyspaces
and column families
private Map<String, List<String>> getSnapshot() throws TException,
InvalidRequestException, NotFoundException {
Map<String, List<String>> map = new HashMap<String,
List<String>>();
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits