Author: [email protected]
Date: Fri Feb 17 11:03:52 2012
New Revision: 2096

Log:
[AMDATUCASSANDRA-166] Added runtime configuration update support

Added:
   
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/ConfigurationUpdateListener.java
Modified:
   
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationService.java
   
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationServiceImpl.java
   
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
   
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/main/java/org/amdatu/cassandra/persistencemanager/hector/service/HectorCassandraPersistenceManagerImpl.java

Modified: 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationService.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationService.java
  (original)
+++ 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationService.java
  Fri Feb 17 11:03:52 2012
@@ -153,4 +153,11 @@
      * @return the complete Cassandra host configuration.
      */
     CassandraHostConfigurator getCassandraHostConfigurator();
+    
+    /**
+     * Append a configuration update listener. Listeners are invoked as soon 
as the configuration
+     * is updated.
+     * @param listener the listener
+     */
+    void addListener(ConfigurationUpdateListener listener);
 }

Modified: 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationServiceImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationServiceImpl.java
      (original)
+++ 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationServiceImpl.java
      Fri Feb 17 11:03:52 2012
@@ -28,7 +28,9 @@
 import org.amdatu.cassandra.client.util.ConfigurationUtil;
 import org.amdatu.cassandra.client.util.ConfigurationUtil.ConfigProperty;
 
+import java.util.ArrayList;
 import java.util.Dictionary;
+import java.util.List;
 
 import org.osgi.service.cm.ConfigurationException;
 import org.osgi.service.cm.ManagedService;
@@ -36,11 +38,14 @@
 
 @SuppressWarnings("rawtypes")
 /**
- * This class manages the configuration for this bundle.
+ * This class manages the configuration for this bundle. Since this bundle 
contains multipel services that
+ * depend on this configuration, we must use such a shared configruation 
service for this bundle.
  * 
  * @author <a href="mailto:[email protected]";>Amdatu Project 
Team</a>
  */
 public class CassandraClientConfigurationServiceImpl implements 
CassandraClientConfigurationService, ManagedService {
+    private List<ConfigurationUpdateListener> m_listeners = new 
ArrayList<ConfigurationUpdateListener>();
+    
     // Definition of available configuration properties with types and defaults
     private static ConfigProperty[] CONFIG_PROPERTIES =
         new ConfigProperty[] {
@@ -134,12 +139,17 @@
             m_config = dictionary;
         }
         else {
-            // This is a runtime update of the configuration, which is not 
supported
-            // since the Hector holds a static reference to the configuration. 
Only
-            // a stop/start of the bundle will reload the class and thus 
reload the
-            // configuration
-            m_logService.log(LogService.LOG_WARNING, "This service does not 
support runtime updates of" +
-                "configuration properties. You must stop/start the bundle 
before the changes are effective.");
+            m_config = dictionary;
+            m_configUtil.init(CONFIG_PROPERTIES, m_config);
+            m_hostConfig = null;
+            
+            for (ConfigurationUpdateListener listener : m_listeners) {
+                listener.configurationUpdated();
+            }
         }
     }
+    
+    public void addListener(ConfigurationUpdateListener listener) {
+        m_listeners.add(listener);
+    }
 }

Modified: 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
   (original)
+++ 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
   Fri Feb 17 11:03:52 2012
@@ -47,14 +47,14 @@
  * 
  * @author ivol
  */
-public class CassandraClientServiceImpl implements CassandraClientService {
+public class CassandraClientServiceImpl implements CassandraClientService, 
ConfigurationUpdateListener {
     // Defaults
     private static final String SYSTEM_TABLE = "system";
     private static final String UNREACHABLE = "UNREACHABLE";
 
     // Map of clusters for this client service
-    private static final Map<String, Cluster> m_clusters = new HashMap<String, 
Cluster>();
-    
+    private static Cluster m_cluster = null;
+
     // Service dependencies, injected by the framework
     private volatile LogService m_logService;
     private volatile EventAdmin m_eventAdmin;
@@ -63,7 +63,6 @@
     // Local cache of ColumnTypes of all ColumnFamilies for each keyspace
     private Map<String, Map<String, String>> m_keyspaceColumnFamilyTypes = new 
HashMap<String, Map<String, String>>();
 
-    
     /**
      * The init() method is invoked by the Felix dependency manager.
      */
@@ -72,6 +71,7 @@
     }
 
     public void start() {
+        m_configuration.addListener(this);
         m_logService.log(LogService.LOG_INFO, getClass().getName() + "service 
started");
     }
 
@@ -95,30 +95,45 @@
         m_eventAdmin = eventAdmin;
     }
 
+    public synchronized void configurationUpdated() {
+        // This is a runtime update of the configuration, we shutdown the 
cluster. It will be recreated
+        // with the new configuration on the next request
+        m_logService.log(LogService.LOG_INFO, "Configuration of the Cassandra 
Client has changed, restarting Cluster " +
+            "to apply the new configuration. This may take some time.");
+
+        // Shutdown the cluster, but ONLY if its exists!
+        Cluster cluster = getHectorCluster(false);
+        if (cluster != null) {
+            m_cluster.getConnectionManager().shutdown();
+            m_cluster = null;
+        }
+    }
+
     // Returns the Hector cluster object
     private Cluster getHectorCluster() {
+        return getHectorCluster(true);
+    }
+
+    private synchronized Cluster getHectorCluster(boolean create) {
         ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             // Perform a classloader switch to prevent log4j trying to load 
classes from the system classloader
             // instead of the bundle classloader.
             
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
             String clusterName = m_configuration.get(CLUSTER_NAME, 
String.class);
-            return getOrCreateCluster(clusterName, 
m_configuration.getCassandraHostConfigurator());
+            return getOrCreateCluster(clusterName, 
m_configuration.getCassandraHostConfigurator(), create);
         }
         finally {
             Thread.currentThread().setContextClassLoader(oldClassLoader);
         }
     }
-    
-    private static Cluster getOrCreateCluster(String clusterName, 
CassandraHostConfigurator cassandraHostConfigurator) {
-        synchronized (m_clusters) {
-            Cluster cluster = m_clusters.get(clusterName);
-            if (cluster == null) {
-                cluster = new ThriftCluster(clusterName, 
cassandraHostConfigurator);
-                m_clusters.put(clusterName, cluster);
-            }
-            return cluster;
+
+    private synchronized static Cluster getOrCreateCluster(String clusterName,
+        CassandraHostConfigurator cassandraHostConfigurator, boolean create) {
+        if (m_cluster == null && create) {
+            m_cluster = new ThriftCluster(clusterName, 
cassandraHostConfigurator);
         }
+        return m_cluster;
     }
 
     public synchronized boolean keyspaceExists(final String keyspaceName) {
@@ -283,7 +298,7 @@
         try {
             List<KeyspaceDefinition> keyspaces = 
getHectorCluster().describeKeyspaces();
             for (KeyspaceDefinition keyspace : keyspaces) {
-                if (keyspace.getReplicationFactor() !=  replicationFactor) {
+                if (keyspace.getReplicationFactor() != replicationFactor) {
                     BasicKeyspaceDefinition newKsDef = new 
BasicKeyspaceDefinition();
                     newKsDef.setName(keyspace.getName());
                     newKsDef.setReplicationFactor(replicationFactor);
@@ -300,7 +315,7 @@
     public synchronized void setReplicationFactor(final String keyspace, final 
int replicationFactor) {
         try {
             KeyspaceDefinition ksDef = 
getHectorCluster().describeKeyspace(keyspace);
-            if (ksDef.getReplicationFactor() !=  replicationFactor) {
+            if (ksDef.getReplicationFactor() != replicationFactor) {
                 BasicKeyspaceDefinition newKsDef = new 
BasicKeyspaceDefinition();
                 newKsDef.setName(ksDef.getName());
                 newKsDef.setReplicationFactor(replicationFactor);

Added: 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/ConfigurationUpdateListener.java
==============================================================================
--- (empty file)
+++ 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/ConfigurationUpdateListener.java
  Fri Feb 17 11:03:52 2012
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2010, 2011 The Amdatu Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.cassandra.client.service;
+
+/**
+ * Listener interface for configuration updates. Invoked on all services that 
use the shared
+ * Configuration service as soon as the configuration is updated.
+ * 
+ * @author <a href="mailto:[email protected]";>Amdatu Project 
Team</a>
+ */
+public interface ConfigurationUpdateListener {
+    /**
+     * This method is invoked as soon as the configuration of this bundle is 
updated.
+     */
+    public void configurationUpdated();
+}

Modified: 
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/main/java/org/amdatu/cassandra/persistencemanager/hector/service/HectorCassandraPersistenceManagerImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/main/java/org/amdatu/cassandra/persistencemanager/hector/service/HectorCassandraPersistenceManagerImpl.java
  (original)
+++ 
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/main/java/org/amdatu/cassandra/persistencemanager/hector/service/HectorCassandraPersistenceManagerImpl.java
  Fri Feb 17 11:03:52 2012
@@ -169,12 +169,20 @@
             m_config = dictionary;
         }
         else {
-            // This is a runtime update of the configuration, which is not 
supported
-            // since the Hector holds a static reference to the configuration. 
Only
-            // a stop/start of the bundle will reload the class and thus 
reload the
-            // configuration
-            getLogService().log(LogService.LOG_WARNING, "This service does not 
support runtime updates of" +
-                "configuration properties. You must stop/start the bundle 
before the changes are effective.");
+            m_config = dictionary;
+            m_configUtil.init(CONFIG_PROPERTIES, m_config);
+            m_hostConfig = null;
+            
+            // This is a runtime update of the configuration, we shutdown the 
cluster. It will be recreated
+            // with the new configuration on the next request
+            getLogService().log(LogService.LOG_INFO, "Configuration of the 
Hector PM has changed, restarting Cluster " +
+                       "to apply the new configuration. This may take some 
time.");
+            
+            // Shutdown the cluster, but ONLY if it exists!
+            Cluster cluster = getHectorCluster(false);
+            if (cluster != null) {
+                HFactory.shutdownCluster(cluster);
+            }
         }
     }
        
@@ -215,13 +223,21 @@
     }
 
     public Cluster getHectorCluster() {
+        return getHectorCluster(true);
+    }
+    
+    public Cluster getHectorCluster(boolean create) {
         ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
             // Perform a classloader switch to prevent log4j trying to load 
classes from the system classloader
             // instead of the bundle classloader.
             
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
             String clusterName = m_configUtil.get(CLUSTER_NAME, String.class);
-            return HFactory.getOrCreateCluster(clusterName, 
getCassandraHostConfigurator());
+            if (create) {
+                return HFactory.getOrCreateCluster(clusterName, 
getCassandraHostConfigurator());
+            } else {
+                return HFactory.getCluster(clusterName);
+            }
         }
         finally {
             Thread.currentThread().setContextClassLoader(oldClassLoader);
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits

Reply via email to