Author: ivol37 at gmail.com
Date: Wed Jan 19 15:17:10 2011
New Revision: 678

Log:
[AMDATU-252] Refactored registration mechanism for ColumnFamilyProviders and 
CassandraPersistenceManagers to work in a clustered setup

Added:
   
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
Modified:
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
   
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
   
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
   
trunk/amdatu-cassandra/cassandra-persistencemanager/src/main/java/org/amdatu/cassandra/persistencemanager/service/CassandraPersistenceManagerFactoryImpl.java

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
       (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
       Wed Jan 19 15:17:10 2011
@@ -116,16 +116,12 @@
                 // Now wait until the operation mode of the daemon becomes 
"Normal". In auto bootstrap mode this can take quite a
                 // while (2 minutes minimum). In a single node cluster this 
will be almost immediately. Unfortunately the operation
                 // mode is not covered by any enum value.
-                String prevOperationMode = "";
                 String operationMode = 
StorageService.instance.getOperationMode();
-                while (!operationMode.equals("Normal") && !isInterrupted() && 
isAlive()) {
-                    if (!operationMode.equals(prevOperationMode)) {
-                        m_logService.log(LogService.LOG_INFO, "Current 
Cassandra Daemon operation mode is '" + operationMode
-                            + "', waiting for daemon to reach operation mode 
'Normal'");
-                    }
-                    prevOperationMode = operationMode;
-                    operationMode = StorageService.instance.getOperationMode();
+                while (!"Normal".equals(operationMode) && !isInterrupted() && 
isAlive()) {
+                    m_logService.log(LogService.LOG_INFO, "Current Cassandra 
Daemon operation mode is '" + operationMode
+                        + "', waiting for daemon to reach operation mode 
'Normal'");
                     Thread.sleep(DAEMON_TIMEOUT);
+                    operationMode = StorageService.instance.getOperationMode();
                 }
 
                 if 
("Normal".equals(StorageService.instance.getOperationMode())) {

Modified: 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
   (original)
+++ 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/osgi/Activator.java
   Wed Jan 19 15:17:10 2011
@@ -18,6 +18,7 @@
 
 import org.amdatu.cassandra.application.CassandraDaemonService;
 import org.amdatu.cassandra.listener.ColumnFamilyProvider;
+import org.amdatu.cassandra.listener.service.CassandraUpdateListenerImpl;
 import org.amdatu.cassandra.listener.service.ColumnFamilyHandler;
 import 
org.amdatu.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
 import org.apache.felix.dm.DependencyActivatorBase;
@@ -33,17 +34,23 @@
 public class Activator extends DependencyActivatorBase {
     @Override
     public void init(BundleContext context, DependencyManager manager) throws 
Exception {
+        manager.add(createComponent()
+            .setImplementation(CassandraUpdateListenerImpl.class)
+            
.add(createServiceDependency().setService(LogService.class).setRequired(true))
+            
.add(createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
+            
.add(createServiceDependency().setService(CassandraPersistenceManagerFactory.class).setRequired(true)));
+
         // Register the CassandraColumnFamilyProvider listener
         manager
-            .add(
+        .add(
             createComponent()
-                .setImplementation(ColumnFamilyHandler.class)
-                
.add(createServiceDependency().setService(LogService.class).setRequired(true))
-                
.add(createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
-                
.add(createServiceDependency().setService(CassandraPersistenceManagerFactory.class).setRequired(true))
-                .add(createServiceDependency()
-                        .setService(ColumnFamilyProvider.class)
-                        .setCallbacks("columnFamilyProviderAdded", 
"columnFamilyProviderRemoved")));
+            .setImplementation(ColumnFamilyHandler.class)
+            
.add(createServiceDependency().setService(LogService.class).setRequired(true))
+            
.add(createServiceDependency().setService(CassandraDaemonService.class).setRequired(true))
+            
.add(createServiceDependency().setService(CassandraPersistenceManagerFactory.class).setRequired(true))
+            .add(createServiceDependency()
+                .setService(ColumnFamilyProvider.class)
+                .setCallbacks("columnFamilyProviderAdded", 
"columnFamilyProviderRemoved")));
     }
 
     @Override

Added: 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
==============================================================================
--- (empty file)
+++ 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
      Wed Jan 19 15:17:10 2011
@@ -0,0 +1,280 @@
+/*
+    Copyright (C) 2010 Amdatu.org
+
+    This program is free software: you can redistribute it and/or modify
+    it under the terms of the GNU General Public License as published by
+    the Free Software Foundation, either version 3 of the License, or
+    (at your option) any later version.
+
+    This program is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU General Public License for more details.
+
+    You should have received a copy of the GNU General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.cassandra.listener.service;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.amdatu.cassandra.application.CassandraDaemonService;
+import org.amdatu.cassandra.listener.ColumnFamilyAvailable;
+import org.amdatu.cassandra.listener.ColumnFamilyDefinition;
+import org.amdatu.cassandra.listener.ColumnFamilyProvider;
+import org.amdatu.cassandra.persistencemanager.CassandraPersistenceManager;
+import 
org.amdatu.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.felix.dm.Component;
+import org.apache.felix.dm.DependencyManager;
+import org.apache.thrift.TException;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.log.LogService;
+
+/**
+ * This class is responsible for listening to changes (add or remove) in the 
Cassandra database with respect to
+ * keyspaces and ColumnFamilies. When a keyspace is added, it will register a 
new CassandraPersistenceManager.
+ * When a keyspace is dropped, it will unregister the corresponding 
CassandraPersistenceManager.
+ * When a ColumnFamily is added, it will register a new ColumnFamilyAvailable 
service. When a ColumnFamily is dropped
+ * it will unregister the corresponding ColumnFamilyAvailable service.
+ * Note that in a clustered setup, keyspaces and columnfamilies can be added 
or removed by other nodes in the cluster.
+ * Since there is no event mechanism for triggering those changes, we create 
an inspect thread that inspects available
+ * keyspaces and columnfamilies each x seconds.
+ * 
+ * @author ivol
+ */
+public class CassandraUpdateListenerImpl {
+    // Services injected by the dependency manager
+    private volatile LogService m_logService;
+    private volatile CassandraDaemonService m_daemonService;
+    private volatile CassandraPersistenceManagerFactory m_pmFactory;
+    private volatile BundleContext m_context;
+    private volatile DependencyManager m_dependencyManager;
+
+    // The thread that inspects the cassandra db for changes
+    private InpectKeyspaceColumnFamilyThread m_inspectThread;
+
+    // The interval for each individual inspect
+    private int INSPECT_INTERVAL = 5000;
+
+    public void start() {
+        // Now start the inspect thread
+        m_inspectThread = new InpectKeyspaceColumnFamilyThread();
+        m_inspectThread.start();
+    }
+
+    public void stop() {
+        // Stop the inspect thread
+        m_inspectThread.interrupt();
+    }
+
+    /**
+     * This Thread inspects available Keyspaces and ColumnFamilies in 
Cassandra and compares these to
+     * the registered CassandraPersistenceManager and ColumnFamilyAvailable 
services. Since other nodes
+     * in the cluster may add, update or drop keyspaces and columnfamilies and 
Cassandra does not support
+     * any event mechanism to act upon these events we will continuously 
inspect this ourselves.
+     * @author ivol
+     *
+     */
+    class InpectKeyspaceColumnFamilyThread extends Thread {
+        Map<String, List<String>> m_keyspaceColumnFamilyMap = new 
HashMap<String, List<String>>();
+
+        @Override
+        public void run() {
+            try {
+                while(!isInterrupted()) {
+                    // Inspect available keyspaces
+                    try {
+                        // Only compare keyspaces with CPM's if there was a 
keyspace change
+                        // in the meantime
+                        if (checkForUpdates()) {
+                            onKeyspaceAdded();
+                            onKeyspaceDropped();
+                            onColumnFamilyAdded();
+                            onColumnFamilyRemoved();
+                        }
+                        m_keyspaceColumnFamilyMap = 
getKeyspaceColumnFamilyMap();
+                    }
+                    catch (TException e) {
+                        m_logService.log(LogService.LOG_ERROR, "Could not 
retrieve keyspaces", e);
+                    }
+                    catch (InvalidRequestException e) {
+                        m_logService.log(LogService.LOG_ERROR, "Could not 
retrieve keyspaces", e);
+                    }
+                    catch (InvalidSyntaxException e) {
+                        m_logService.log(LogService.LOG_ERROR,"Could not 
retrieve Cassandra Persistence Manager services", e);
+                    }
+                    catch (NotFoundException e) {
+                        m_logService.log(LogService.LOG_ERROR,"An error 
occurred while synchronizing ColumnFamilyAvailable services", e);
+                    }
+
+                    Thread.sleep(INSPECT_INTERVAL);
+                }
+            }
+            catch (InterruptedException e) {
+            }
+        }
+
+        private Map<String, List<String>> getKeyspaceColumnFamilyMap() throws 
TException, InvalidRequestException, NotFoundException {
+            Map<String, List<String>> map = new HashMap<String, 
List<String>>();
+            for (String keyspace : m_daemonService.getKeyspaces()) {
+                map.put(keyspace, m_daemonService.getColumnFamilies(keyspace));
+            }
+            return map;
+        }
+
+        private boolean checkForUpdates() throws TException, 
InvalidRequestException, NotFoundException {
+            if (m_keyspaceColumnFamilyMap.keySet() == null && 
m_daemonService.getKeyspaces() == null) {
+                return false;
+            } else if (m_daemonService.getKeyspaces() == null) {
+                return true;
+            } else {
+
+                return 
!getKeyspaceColumnFamilyMap().equals(m_keyspaceColumnFamilyMap);
+            }
+        }
+
+        // Loop over all keyspaces and register a Cassandra persistence 
manager when there is none
+        // available for that keyspace
+        private void onKeyspaceAdded() throws InvalidSyntaxException, 
TException, InvalidRequestException, NotFoundException {
+            List<String> keyspaces = m_daemonService.getKeyspaces();
+            if (keyspaces != null) {
+                for (String keyspace : keyspaces) {
+                    String filter = "(" + 
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
+                    ServiceReference[] servRefs = 
m_context.getAllServiceReferences(CassandraPersistenceManager.class.getName(), 
filter);
+                    if (servRefs == null || servRefs.length == 0) {
+                        // No Cassandra persistence manager available for this 
keyspace, register it now
+                        
m_pmFactory.createCassandraPersistenceManager(keyspace);
+                    }
+                }
+
+                // Now verify the keyspace global CF providers and add 
ColumnFamilies for new keyspaces when needed
+                verifyKeyspaceGlobalProviders();
+            }
+        }
+
+        // Special use case: when a ColumnFamily{Provider is registered as a 
keyspace global service (so keyspace equals null),
+        // the ColumnFamily should be present in all keyspaces. So when a 
keyspace is added, the ColumnFamily should be added too
+        // for such a provider
+        public void verifyKeyspaceGlobalProviders() throws NotFoundException, 
InvalidRequestException, TException, InvalidSyntaxException {
+            ServiceReference[] servRefs = 
m_context.getAllServiceReferences(ColumnFamilyProvider.class.getName(), null);
+            if (servRefs != null) {
+                for (ServiceReference ref : servRefs) {
+                    ColumnFamilyProvider provider = (ColumnFamilyProvider) 
m_context.getService(ref);
+                    if (isKeyspaceGlobal(provider)) {
+                        List<String> keyspaces = 
m_daemonService.getKeyspaces();
+                        if (keyspaces != null) {
+                            for (String keyspace : keyspaces) {
+                                if (!Table.SYSTEM_TABLE.equals(keyspace)) {
+                                    // Verify that the ColumnFamily for this 
keyspace global provider is available in this keyspace
+                                    for (ColumnFamilyDefinition cfDef : 
provider.getColumnFamilies()) {
+                                        if 
(!m_daemonService.getColumnFamilies(keyspace).contains(cfDef.getName())) {
+                                            
m_logService.log(LogService.LOG_DEBUG, "Adding ColumnFamily '" + 
cfDef.getName() + "' to keyspace '"
+                                                + keyspace + "' for the 
keyspace-global ColumnFamilyProvider '" + provider.getClass().getName() + "'");
+                                            final String cfName = 
cfDef.getName();
+                                            String columnType = 
cfDef.getColumnType().value;
+                                            String comparatorType = 
cfDef.getCompareWith().value;
+                                            String subComparatorType = 
cfDef.getCompareSubcolumnsWith().value;
+                                            
m_daemonService.addColumnFamily(keyspace, cfName, columnType, comparatorType, 
subComparatorType);
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        private boolean isKeyspaceGlobal(ColumnFamilyProvider provider) {
+            for (ColumnFamilyDefinition cfDef : provider.getColumnFamilies()) {
+                if (cfDef.getKeyspaces() == null) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        // Loop over all Cassandra Persistence Managers and unregister them if 
the corresponding keyspace
+        // is removed from Cassandra
+        private void onKeyspaceDropped() throws InvalidSyntaxException, 
TException, InvalidRequestException {
+            List<String> keyspaces = m_daemonService.getKeyspaces();
+            ServiceReference[] servRefs = 
m_context.getAllServiceReferences(CassandraPersistenceManager.class.getName(), 
null);
+            if (servRefs != null) {
+                for (ServiceReference servRef : servRefs) {
+                    String keyspace = 
servRef.getProperty(CassandraPersistenceManager.KEYSPACE_AWARE_KEY).toString();
+                    if (keyspaces == null || !keyspaces.contains(keyspace)){
+                        // No keyspace available for this Cassandra 
persistence manager, unregister the service now
+                        m_context.ungetService(servRef);
+                        m_logService.log(LogService.LOG_INFO, "Cassandra 
Persistence Manager service for keyspace '" + keyspace + "' unregistered");
+                    }
+                }
+            }
+        }
+
+        private void onColumnFamilyAdded() throws TException, 
InvalidRequestException, InvalidSyntaxException, NotFoundException {
+            List<String> keyspaces = m_daemonService.getKeyspaces();
+            if (keyspaces != null) {
+                for (String keyspace : keyspaces) {
+                    List<String> columnFamilies = 
m_daemonService.getColumnFamilies(keyspace);
+                    for (String columnFamily : columnFamilies) {
+                        String ksFilter = "(" + 
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
+                        String cfFilter = "(" + 
ColumnFamilyAvailable.FILTER_NAME  + "=" + columnFamily + ")";
+                        String filter = "(&" + ksFilter + cfFilter + ")";
+                        ServiceReference[] servRefs = 
m_context.getAllServiceReferences(ColumnFamilyAvailable.class.getName(), 
filter);
+                        if (servRefs == null || servRefs.length == 0) {
+                            registerColumnamilyAvailableService(keyspace, 
columnFamily);
+                        }
+                    }
+                }
+            }
+        }
+
+        private void onColumnFamilyRemoved() throws TException, 
InvalidRequestException, InvalidSyntaxException, NotFoundException {
+            ServiceReference[] servRefs = 
m_context.getAllServiceReferences(ColumnFamilyAvailable.class.getName(), null);
+            if (servRefs != null) {
+                for (ServiceReference servRef : servRefs) {
+                    String keyspace = 
servRef.getProperty(CassandraPersistenceManager.KEYSPACE_AWARE_KEY).toString();
+                    String columnFamily = 
servRef.getProperty(ColumnFamilyAvailable.FILTER_NAME).toString();
+                    boolean remove = false;
+                    try {
+                        if 
(!m_daemonService.getColumnFamilies(keyspace).contains(columnFamily)) {
+                            remove = true;
+                        }
+
+                    } catch (NotFoundException e) {
+                        // This exception is thrown when the keyspace could 
not be found anymore, in which
+                        // case also the ColumnFamilyAvailable for that 
keyspace should be removed
+                        remove = true;
+                    }
+                    if (remove) {
+                        m_context.ungetService(servRef);
+                        m_logService.log(LogService.LOG_INFO, 
"ColumnFamilyAvailable service for keyspace '" + keyspace
+                            + "' and ColumnFamily '" + columnFamily + "' 
unregistered");
+                    }
+                }
+            }
+        }
+
+        private void registerColumnamilyAvailableService(String keyspace, 
String columnFamily) {
+            Dictionary<String, String> serviceProps = new Hashtable<String, 
String>();
+            serviceProps.put(CassandraPersistenceManager.KEYSPACE_AWARE_KEY, 
keyspace);
+            serviceProps.put(ColumnFamilyAvailable.FILTER_NAME, columnFamily);
+
+            Component component = m_dependencyManager.createComponent()
+            .setInterface(ColumnFamilyAvailable.class.getName(), serviceProps)
+            .setImplementation(ColumnFamilyAvailableImpl.class);
+
+            m_dependencyManager.add(component);
+            m_logService.log(LogService.LOG_INFO, "ColumnFamilyAvailable 
service for  keyspace '" + keyspace + "' and ColumnFamily '" + columnFamily + 
"' registered");
+        }
+    }
+}

Modified: 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
      (original)
+++ 
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/ColumnFamilyHandler.java
      Wed Jan 19 15:17:10 2011
@@ -16,88 +16,29 @@
  */
 package org.amdatu.cassandra.listener.service;
 
-import java.util.Dictionary;
-import java.util.HashMap;
-import java.util.Hashtable;
 import java.util.List;
-import java.util.Map;
 
 import org.amdatu.cassandra.application.CassandraDaemonService;
-import org.amdatu.cassandra.listener.ColumnFamilyAvailable;
 import org.amdatu.cassandra.listener.ColumnFamilyDefinition;
 import org.amdatu.cassandra.listener.ColumnFamilyProvider;
-import org.amdatu.cassandra.persistencemanager.CassandraException;
-import org.amdatu.cassandra.persistencemanager.CassandraPersistenceManager;
-import 
org.amdatu.cassandra.persistencemanager.CassandraPersistenceManagerFactory;
 import org.apache.cassandra.db.Table;
-import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.KsDef;
 import org.apache.cassandra.thrift.NotFoundException;
-import org.apache.felix.dm.Component;
-import org.apache.felix.dm.DependencyManager;
 import org.apache.thrift.TException;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.event.Event;
-import org.osgi.service.event.EventConstants;
-import org.osgi.service.event.EventHandler;
 import org.osgi.service.log.LogService;
-import org.osgi.util.tracker.ServiceTracker;
 
 /**
- * This class makes sure that all ColumnFamilies that are available in 
Cassandra get a
- * corresponding ColumnFamilyAvailable service. Also, it listens for 
ColumnFamilyDefinitions,
- * add the respective ColumnFamily to Cassandra, and registers the service as 
soon as the
- * ColumnFamily is actually available.
+ * This service listens to ColumnFamilyProvider services and adds or updates 
keyspaces or ColumnFamilies
+ * if they are not yet available in Cassandra. Note that this service is not 
responsible for registration
+ * of corresponding CassandraPersistenceManager and ColumnFamilyAvailable 
services, that is up to the
+ * CassandraUpdateListener since that should work independent of this OSGi 
framework (in a Cassandra cluster
+ * data is synchronized between Cassandra nodes).
  */
-public class ColumnFamilyHandler implements EventHandler {
-
+public class ColumnFamilyHandler {
+    // Service dependencies, injected by the dependency manager
     private volatile LogService m_logService;
     private volatile CassandraDaemonService m_daemonService;
-    private volatile CassandraPersistenceManagerFactory m_pmFactory;
-
-    private volatile DependencyManager m_dependencyManager;
-    private volatile BundleContext m_context;
-
-    private final Map<KeySpaceColumnFamilyCombination, Component> m_services =
-        new HashMap<KeySpaceColumnFamilyCombination, Component>();
-
-    @SuppressWarnings("unchecked")
-    public void init() {
-        Dictionary d = new Hashtable();
-        d.put(EventConstants.EVENT_TOPIC, new 
String[]{CassandraDaemonService.EVENT_ADMIN_TOPIC});
-        m_context.registerService( EventHandler.class.getName(), this, d );
-    }
-
-    public void start() {
-        try {
-            // Register all currently available keyspace/columnfamily 
combinations.
-            List<KsDef> keyspaces = 
m_daemonService.getCassandraServer().describe_keyspaces();
-            for (KsDef keyspace : keyspaces) {
-                String keyspaceName = keyspace.getName();
-                m_pmFactory.createCassandraPersistenceManager(keyspaceName);
-                List<CfDef> columnFamilies = keyspace.getCf_defs();
-                for (CfDef columnFamily : columnFamilies) {
-                    addServiceFor(keyspaceName, columnFamily.getName());
-                }
-            }
-        }
-        catch (TException e) {
-            m_logService.log(LogService.LOG_ERROR, "Error registering 
ColumnFamilyAvailable services for existing families.", e);
-        }
-        catch (InvalidRequestException e) {
-            m_logService.log(LogService.LOG_ERROR, "Error registering 
ColumnFamilyAvailable services for existing families.", e);
-        }
-    }
-
-    public void stop() {
-        // Since we registered all services, we should also remove them.
-        for (Component component : m_services.values()) {
-            m_dependencyManager.remove(component);
-        }
-    }
 
     public void columnFamilyProviderAdded(ColumnFamilyProvider provider) {
         try {
@@ -113,14 +54,15 @@
                         if (!Table.SYSTEM_TABLE.equals(keyspace)) {
                             // Create if it does not yet exist
                             m_daemonService.addKeyspace(keyspace);
-                            addOrUpdateColumnFamily(keyspace, columnFamily);
+                            addOrUpdateColumnFamily(m_daemonService, keyspace, 
columnFamily);
                         }
                     }
                 }
                 else {
+                    // Add to all available keyspaces
                     for (KsDef keyspaceDef : keyspaceDefinitions) {
                         if (!Table.SYSTEM_TABLE.equals(keyspaceDef.getName())) 
{
-                            addOrUpdateColumnFamily(keyspaceDef.getName(), 
columnFamily);
+                            addOrUpdateColumnFamily(m_daemonService, 
keyspaceDef.getName(), columnFamily);
                         }
                     }
                 }
@@ -144,21 +86,18 @@
         // We don't act on this yet.
     }
 
-    private void addOrUpdateColumnFamily(final String ksName, 
ColumnFamilyDefinition colDef) throws InvalidRequestException,
+    private void addOrUpdateColumnFamily(CassandraDaemonService daemonService, 
final String ksName, ColumnFamilyDefinition colDef) throws 
InvalidRequestException,
     TException, NotFoundException {
         final String cfName = colDef.getName();
         String columnType = colDef.getColumnType().value;
         String comparatorType = colDef.getCompareWith().value;
         String subComparatorType = colDef.getCompareSubcolumnsWith().value;
 
-        if (m_daemonService.addColumnFamily(ksName, cfName, columnType, 
comparatorType, subComparatorType)) {
-            waitForColumnFamilyAndRegisterService(ksName, cfName);
-        }
-        else {
+        if (!daemonService.addColumnFamily(ksName, cfName, columnType, 
comparatorType, subComparatorType)) {
             // Since Cassandra does not (yet) support updating columnType, 
comparatorType or subComparatorType
             // of existing ColumnFamily's, we throw an exception if one of 
these has been changed by the provider.
             // If there are no changes, we do nothing
-            if (m_daemonService.isColumnFamilyChanged(ksName, cfName, 
columnType, comparatorType, subComparatorType)) {
+            if (daemonService.isColumnFamilyChanged(ksName, cfName, 
columnType, comparatorType, subComparatorType)) {
                 throw new InvalidRequestException("Definition of ColumnFamily 
'" + cfName
                     + "' has been changed, but changes in columnType, 
comparatorType "
                     + "and subComparatorType are not supported by Cassandra");
@@ -166,142 +105,4 @@
             m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + cfName + 
"' not changed in keyspace '" + ksName + "'");
         }
     }
-
-    private void waitForColumnFamilyAndRegisterService(final String keyspace, 
final String columnFamily) {
-        new Thread("waiting for keyspace " + keyspace + ", columnfamily " + 
columnFamily) {
-            public void run() {
-                try {
-                    waitFor(keyspace, columnFamily);
-                }
-                catch (InterruptedException e) {
-                    // Right, someone wants us to stop. Abort!
-                    return;
-                }
-                addServiceFor(keyspace, columnFamily);
-            }
-        }.start();
-    }
-
-    private void waitFor(String keyspace, String columnFamily) throws 
InterruptedException {
-        CassandraPersistenceManager persistenceManager = 
getPersistenceManager(keyspace);
-
-        long startTime = System.currentTimeMillis();
-        boolean found = false;
-        while (System.currentTimeMillis() - startTime < 10000 && found == 
false) {
-            try {
-                found |= persistenceManager.exists(columnFamily);
-            }
-            catch (CassandraException e) {
-                // apparently our columnFamily isn't available yet... try 
again next round.
-            }
-
-            Thread.sleep(500);
-        }
-
-        if (!found) {
-            throw new IllegalStateException("After waiting for " + 10000 + 
"ms, columnfamily " +
-                columnFamily + " is not yet available.");
-        }
-    }
-
-    private CassandraPersistenceManager getPersistenceManager(String keyspace) 
throws InterruptedException {
-        String objectClassFilter = "(" + 
org.osgi.framework.Constants.OBJECTCLASS + "="
-        + CassandraPersistenceManager.class.getName() + ")";
-        String keyspaceFilter = "(" + 
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
-        String persistenceManagerFilter = "(&" + objectClassFilter + 
keyspaceFilter + ")";
-
-        ServiceTracker tracker;
-        try {
-            tracker = new ServiceTracker(m_context, 
m_context.createFilter(persistenceManagerFilter), null);
-        }
-        catch (InvalidSyntaxException e) {
-            /*
-             * This should not happen, since we construct the filter 
ourselves. If it does, the keyspace or
-             * columnFamily is invalid.
-             */
-            m_logService.log(LogService.LOG_ERROR, "Could not create filter: " 
+ persistenceManagerFilter, e);
-            return null; // let it NPE one level up.
-        }
-        tracker.open();
-
-        CassandraPersistenceManager persistenceManager = null;
-        try {
-            persistenceManager = (CassandraPersistenceManager) 
tracker.waitForService(5000);
-        }
-        finally {
-            tracker.close();
-        }
-
-        if (persistenceManager == null) {
-            throw new IllegalStateException("After waiting for " + 5000 + "ms, 
we don't have a "
-                + "persistencemanager for " + keyspace + " yet.");
-        }
-        return persistenceManager;
-    }
-
-    private void addServiceFor(String keyspace, String columnFamily) {
-        if (m_services.containsKey(new 
KeySpaceColumnFamilyCombination(keyspace, columnFamily))) {
-            return;
-        }
-
-        Dictionary<String, String> serviceProps = new Hashtable<String, 
String>();
-        serviceProps.put(CassandraPersistenceManager.KEYSPACE_AWARE_KEY, 
keyspace);
-        serviceProps.put(ColumnFamilyAvailable.FILTER_NAME, columnFamily);
-
-        Component component = m_dependencyManager.createComponent()
-        .setInterface(ColumnFamilyAvailable.class.getName(), serviceProps)
-        .setImplementation(ColumnFamilyAvailableImpl.class);
-
-        m_services.put(new KeySpaceColumnFamilyCombination(keyspace, 
columnFamily), component);
-        m_dependencyManager.add(component);
-        m_logService.log(LogService.LOG_INFO, "ColumnFamily '" + columnFamily 
+ "' is now available in keyspace '" + keyspace + "'");
-    }
-
-    private static class KeySpaceColumnFamilyCombination extends 
HashMap<String, Object> {
-        private static final long serialVersionUID = 6574039194678276636L;
-
-        public KeySpaceColumnFamilyCombination(String keyspace, String 
columnFamily) {
-            put("keyspace", keyspace);
-            put("columnFamily", columnFamily);
-        }
-    }
-
-    private boolean isKeyspaceGlobal(ColumnFamilyProvider provider) {
-        for (ColumnFamilyDefinition cfDef : provider.getColumnFamilies()) {
-            if (cfDef.getKeyspaces() == null) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    @Override
-    public void handleEvent(Event event) {
-        try {
-            Object keyspaceAdded = 
event.getProperty(CassandraDaemonService.EVENT_ADMIN_KEYSPACE_ADDED);
-            if (keyspaceAdded != null) {
-                // If a keyspace was added, we must add ColumnFamily's for all 
keyspace-global ColumnFamilyProvider's
-                // (this are all ColumnFamilyProvider's that defined keyspace 
null)
-                String keyspaceName = keyspaceAdded.toString();
-                m_logService.log(LogService.LOG_DEBUG, "Recieved keyspace 
added event for keyspace '" + keyspaceName + "' ");
-
-                // First register a new cassandra persistence manager for this 
new keyspace
-                m_pmFactory.createCassandraPersistenceManager(keyspaceName);
-
-                ServiceReference[] servRefs = 
m_context.getAllServiceReferences(ColumnFamilyProvider.class.getName(), null);
-                if (servRefs != null) {
-                    for (ServiceReference ref : servRefs) {
-                        ColumnFamilyProvider provider = (ColumnFamilyProvider) 
m_context.getService(ref);
-                        if (isKeyspaceGlobal(provider)) {
-                            m_logService.log(LogService.LOG_DEBUG, "Updating 
ColumnFamilyProvider '" + provider.getClass().getName() + "' as it is 
keyspace-global");
-                            columnFamilyProviderAdded(provider);
-                        }
-                    }
-                }
-            }
-        }
-        catch (InvalidSyntaxException e) {
-            m_logService.log(LogService.LOG_ERROR, "Could not handle event '" 
+ event.getTopic() + "' ", e);
-        }
-    }
 }

Modified: 
trunk/amdatu-cassandra/cassandra-persistencemanager/src/main/java/org/amdatu/cassandra/persistencemanager/service/CassandraPersistenceManagerFactoryImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-persistencemanager/src/main/java/org/amdatu/cassandra/persistencemanager/service/CassandraPersistenceManagerFactoryImpl.java
       (original)
+++ 
trunk/amdatu-cassandra/cassandra-persistencemanager/src/main/java/org/amdatu/cassandra/persistencemanager/service/CassandraPersistenceManagerFactoryImpl.java
       Wed Jan 19 15:17:10 2011
@@ -49,5 +49,6 @@
             .setRequired(true));
 
         m_dependencyManager.add(component);
+        m_logService.log(LogService.LOG_INFO, "Cassandra Persistence Manager 
service for keyspace '" + keyspaceId + "' registered");
     }
 }

Reply via email to