Author: ivol37 at gmail.com
Date: Mon Jan 24 10:14:53 2011
New Revision: 689
Log:
[AMDATU-252] Fixed timing issue by using snapshots. Each run a snapshot is
made, compared to the previous snapshots and if there are any changes those
changes as contained by the new snapshots are handled. This prevents that
during handling of the updates keyspaces or CF's are added, removed or updated.
Those changes could fail to be handled as it all depends on timing; when a new
keyspace was added just after new keyspaces are handles, the change would not
be picked up.
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
Mon Jan 24 10:14:53 2011
@@ -21,6 +21,7 @@
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.amdatu.cassandra.application.CassandraDaemonService;
import org.amdatu.cassandra.listener.ColumnFamilyAvailable;
@@ -85,7 +86,7 @@
*
*/
class InpectKeyspaceColumnFamilyThread extends Thread {
- Map<String, List<String>> m_keyspaceColumnFamilyMap = new
HashMap<String, List<String>>();
+ Map<String, List<String>> m_snapshot = new HashMap<String,
List<String>>();
@Override
public void run() {
@@ -93,15 +94,19 @@
while(!isInterrupted()) {
// Inspect available keyspaces
try {
- // Only compare keyspaces with CPM's if there was a
keyspace change
- // in the meantime
- if (checkForUpdates()) {
+ // Only compare keyspaces with CPM's if there was a
keyspace change in the meantime
+ // We make a snapshot before we start checking if any
updates are needed and performing them.
+ // Updates are needed when the snaphot taken now
differs from the one taken previously
+ Map<String, List<String>> newSnapshot = getSnapshot();
+ if (!m_snapshot.equals(newSnapshot)) {
+ m_snapshot = newSnapshot;
onKeyspaceAdded();
onKeyspaceDropped();
onColumnFamilyAdded();
onColumnFamilyRemoved();
+ } else {
+ m_snapshot = newSnapshot;
}
- m_keyspaceColumnFamilyMap =
getKeyspaceColumnFamilyMap();
}
catch (TException e) {
m_logService.log(LogService.LOG_ERROR, "Could not
retrieve keyspaces. Cause: " + e.getMessage(), e);
@@ -120,10 +125,12 @@
}
}
catch (InterruptedException e) {
+ m_logService.log(LogService.LOG_INFO, "Cassandra update
listener thread interrupted.");
}
}
- private Map<String, List<String>> getKeyspaceColumnFamilyMap() throws
TException, InvalidRequestException, NotFoundException {
+ // Creates and returns a snapshot of the currently available keyspaces
and columnfamilies
+ private Map<String, List<String>> getSnapshot() throws TException,
InvalidRequestException, NotFoundException {
Map<String, List<String>> map = new HashMap<String,
List<String>>();
for (String keyspace : m_daemonService.getKeyspaces()) {
map.put(keyspace, m_daemonService.getColumnFamilies(keyspace));
@@ -131,21 +138,10 @@
return map;
}
- private boolean checkForUpdates() throws TException,
InvalidRequestException, NotFoundException {
- if (m_keyspaceColumnFamilyMap.keySet() == null &&
m_daemonService.getKeyspaces() == null) {
- return false;
- } else if (m_daemonService.getKeyspaces() == null) {
- return true;
- } else {
-
- return
!getKeyspaceColumnFamilyMap().equals(m_keyspaceColumnFamilyMap);
- }
- }
-
// Loop over all keyspaces and register a Cassandra persistence
manager when there is none
// available for that keyspace
private void onKeyspaceAdded() throws InvalidSyntaxException,
TException, InvalidRequestException, NotFoundException {
- List<String> keyspaces = m_daemonService.getKeyspaces();
+ Set<String> keyspaces = m_snapshot.keySet();
if (keyspaces != null) {
for (String keyspace : keyspaces) {
String filter = "(" +
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
@@ -170,13 +166,13 @@
for (ServiceReference ref : servRefs) {
ColumnFamilyProvider provider = (ColumnFamilyProvider)
m_context.getService(ref);
if (isKeyspaceGlobal(provider)) {
- List<String> keyspaces =
m_daemonService.getKeyspaces();
+ Set<String> keyspaces = m_snapshot.keySet();
if (keyspaces != null) {
for (String keyspace : keyspaces) {
if (!Table.SYSTEM_TABLE.equals(keyspace)) {
// Verify that the ColumnFamily for this
keyspace global provider is available in this keyspace
for (ColumnFamilyDefinition cfDef :
provider.getColumnFamilies()) {
- if
(!m_daemonService.getColumnFamilies(keyspace).contains(cfDef.getName())) {
+ if
(!m_snapshot.get(keyspace).contains(cfDef.getName())) {
m_logService.log(LogService.LOG_DEBUG, "Adding ColumnFamily '" +
cfDef.getName() + "' to keyspace '"
+ keyspace + "' for the
keyspace-global ColumnFamilyProvider '" + provider.getClass().getName() + "'");
final String cfName =
cfDef.getName();
@@ -206,7 +202,7 @@
// Loop over all Cassandra Persistence Managers and unregister them if
the corresponding keyspace
// is removed from Cassandra
private void onKeyspaceDropped() throws InvalidSyntaxException,
TException, InvalidRequestException {
- List<String> keyspaces = m_daemonService.getKeyspaces();
+ Set<String> keyspaces = m_snapshot.keySet();
ServiceReference[] servRefs =
m_context.getAllServiceReferences(CassandraPersistenceManager.class.getName(),
null);
if (servRefs != null) {
for (ServiceReference servRef : servRefs) {
@@ -221,10 +217,10 @@
}
private void onColumnFamilyAdded() throws TException,
InvalidRequestException, InvalidSyntaxException, NotFoundException {
- List<String> keyspaces = m_daemonService.getKeyspaces();
+ Set<String> keyspaces = m_snapshot.keySet();
if (keyspaces != null) {
for (String keyspace : keyspaces) {
- List<String> columnFamilies =
m_daemonService.getColumnFamilies(keyspace);
+ List<String> columnFamilies = m_snapshot.get(keyspace);
for (String columnFamily : columnFamilies) {
String ksFilter = "(" +
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
String cfFilter = "(" +
ColumnFamilyAvailable.FILTER_NAME + "=" + columnFamily + ")";
@@ -244,18 +240,7 @@
for (ServiceReference servRef : servRefs) {
String keyspace =
servRef.getProperty(CassandraPersistenceManager.KEYSPACE_AWARE_KEY).toString();
String columnFamily =
servRef.getProperty(ColumnFamilyAvailable.FILTER_NAME).toString();
- boolean remove = false;
- try {
- if
(!m_daemonService.getColumnFamilies(keyspace).contains(columnFamily)) {
- remove = true;
- }
-
- } catch (NotFoundException e) {
- // This exception is thrown when the keyspace could
not be found anymore, in which
- // case also the ColumnFamilyAvailable for that
keyspace should be removed
- remove = true;
- }
- if (remove) {
+ if (m_snapshot.get(keyspace) == null ||
!m_snapshot.get(keyspace).contains(columnFamily)) {
m_context.ungetService(servRef);
m_logService.log(LogService.LOG_INFO,
"ColumnFamilyAvailable service for keyspace '" + keyspace
+ "' and ColumnFamily '" + columnFamily + "'
unregistered");