Author: [email protected]
Date: Fri Feb 17 11:03:52 2012
New Revision: 2096
Log:
[AMDATUCASSANDRA-166] Added runtime configuration update support
Added:
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/ConfigurationUpdateListener.java
Modified:
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationService.java
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationServiceImpl.java
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/main/java/org/amdatu/cassandra/persistencemanager/hector/service/HectorCassandraPersistenceManagerImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationService.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationService.java
(original)
+++
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationService.java
Fri Feb 17 11:03:52 2012
@@ -153,4 +153,11 @@
* @return the complete Cassandra host configuration.
*/
CassandraHostConfigurator getCassandraHostConfigurator();
+
+ /**
+ * Append a configuration update listener. Listeners are invoked as soon
as the configuration
+ * is updated.
+ * @param listener the listener
+ */
+ void addListener(ConfigurationUpdateListener listener);
}
Modified:
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientConfigurationServiceImpl.java
Fri Feb 17 11:03:52 2012
@@ -28,7 +28,9 @@
import org.amdatu.cassandra.client.util.ConfigurationUtil;
import org.amdatu.cassandra.client.util.ConfigurationUtil.ConfigProperty;
+import java.util.ArrayList;
import java.util.Dictionary;
+import java.util.List;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.cm.ManagedService;
@@ -36,11 +38,14 @@
@SuppressWarnings("rawtypes")
/**
- * This class manages the configuration for this bundle.
+ * This class manages the configuration for this bundle. Since this bundle
contains multipel services that
+ * depend on this configuration, we must use such a shared configruation
service for this bundle.
*
* @author <a href="mailto:[email protected]">Amdatu Project
Team</a>
*/
public class CassandraClientConfigurationServiceImpl implements
CassandraClientConfigurationService, ManagedService {
+ private List<ConfigurationUpdateListener> m_listeners = new
ArrayList<ConfigurationUpdateListener>();
+
// Definition of available configuration properties with types and defaults
private static ConfigProperty[] CONFIG_PROPERTIES =
new ConfigProperty[] {
@@ -134,12 +139,17 @@
m_config = dictionary;
}
else {
- // This is a runtime update of the configuration, which is not
supported
- // since the Hector holds a static reference to the configuration.
Only
- // a stop/start of the bundle will reload the class and thus
reload the
- // configuration
- m_logService.log(LogService.LOG_WARNING, "This service does not
support runtime updates of" +
- "configuration properties. You must stop/start the bundle
before the changes are effective.");
+ m_config = dictionary;
+ m_configUtil.init(CONFIG_PROPERTIES, m_config);
+ m_hostConfig = null;
+
+ for (ConfigurationUpdateListener listener : m_listeners) {
+ listener.configurationUpdated();
+ }
}
}
+
+ public void addListener(ConfigurationUpdateListener listener) {
+ m_listeners.add(listener);
+ }
}
Modified:
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
Fri Feb 17 11:03:52 2012
@@ -47,14 +47,14 @@
*
* @author ivol
*/
-public class CassandraClientServiceImpl implements CassandraClientService {
+public class CassandraClientServiceImpl implements CassandraClientService,
ConfigurationUpdateListener {
// Defaults
private static final String SYSTEM_TABLE = "system";
private static final String UNREACHABLE = "UNREACHABLE";
// Map of clusters for this client service
- private static final Map<String, Cluster> m_clusters = new HashMap<String,
Cluster>();
-
+ private static Cluster m_cluster = null;
+
// Service dependencies, injected by the framework
private volatile LogService m_logService;
private volatile EventAdmin m_eventAdmin;
@@ -63,7 +63,6 @@
// Local cache of ColumnTypes of all ColumnFamilies for each keyspace
private Map<String, Map<String, String>> m_keyspaceColumnFamilyTypes = new
HashMap<String, Map<String, String>>();
-
/**
* The init() method is invoked by the Felix dependency manager.
*/
@@ -72,6 +71,7 @@
}
public void start() {
+ m_configuration.addListener(this);
m_logService.log(LogService.LOG_INFO, getClass().getName() + "service
started");
}
@@ -95,30 +95,45 @@
m_eventAdmin = eventAdmin;
}
+ public synchronized void configurationUpdated() {
+ // This is a runtime update of the configuration, we shutdown the
cluster. It will be recreated
+ // with the new configuration on the next request
+ m_logService.log(LogService.LOG_INFO, "Configuration of the Cassandra
Client has changed, restarting Cluster " +
+ "to apply the new configuration. This may take some time.");
+
+ // Shutdown the cluster, but ONLY if its exists!
+ Cluster cluster = getHectorCluster(false);
+ if (cluster != null) {
+ m_cluster.getConnectionManager().shutdown();
+ m_cluster = null;
+ }
+ }
+
// Returns the Hector cluster object
private Cluster getHectorCluster() {
+ return getHectorCluster(true);
+ }
+
+ private synchronized Cluster getHectorCluster(boolean create) {
ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
try {
// Perform a classloader switch to prevent log4j trying to load
classes from the system classloader
// instead of the bundle classloader.
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
String clusterName = m_configuration.get(CLUSTER_NAME,
String.class);
- return getOrCreateCluster(clusterName,
m_configuration.getCassandraHostConfigurator());
+ return getOrCreateCluster(clusterName,
m_configuration.getCassandraHostConfigurator(), create);
}
finally {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
}
-
- private static Cluster getOrCreateCluster(String clusterName,
CassandraHostConfigurator cassandraHostConfigurator) {
- synchronized (m_clusters) {
- Cluster cluster = m_clusters.get(clusterName);
- if (cluster == null) {
- cluster = new ThriftCluster(clusterName,
cassandraHostConfigurator);
- m_clusters.put(clusterName, cluster);
- }
- return cluster;
+
+ private synchronized static Cluster getOrCreateCluster(String clusterName,
+ CassandraHostConfigurator cassandraHostConfigurator, boolean create) {
+ if (m_cluster == null && create) {
+ m_cluster = new ThriftCluster(clusterName,
cassandraHostConfigurator);
}
+ return m_cluster;
}
public synchronized boolean keyspaceExists(final String keyspaceName) {
@@ -283,7 +298,7 @@
try {
List<KeyspaceDefinition> keyspaces =
getHectorCluster().describeKeyspaces();
for (KeyspaceDefinition keyspace : keyspaces) {
- if (keyspace.getReplicationFactor() != replicationFactor) {
+ if (keyspace.getReplicationFactor() != replicationFactor) {
BasicKeyspaceDefinition newKsDef = new
BasicKeyspaceDefinition();
newKsDef.setName(keyspace.getName());
newKsDef.setReplicationFactor(replicationFactor);
@@ -300,7 +315,7 @@
public synchronized void setReplicationFactor(final String keyspace, final
int replicationFactor) {
try {
KeyspaceDefinition ksDef =
getHectorCluster().describeKeyspace(keyspace);
- if (ksDef.getReplicationFactor() != replicationFactor) {
+ if (ksDef.getReplicationFactor() != replicationFactor) {
BasicKeyspaceDefinition newKsDef = new
BasicKeyspaceDefinition();
newKsDef.setName(ksDef.getName());
newKsDef.setReplicationFactor(replicationFactor);
Added:
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/ConfigurationUpdateListener.java
==============================================================================
--- (empty file)
+++
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/ConfigurationUpdateListener.java
Fri Feb 17 11:03:52 2012
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2010, 2011 The Amdatu Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.cassandra.client.service;
+
+/**
+ * Listener interface for configuration updates. Invoked on all services that
use the shared
+ * Configuration service as soon as the configuration is updated.
+ *
+ * @author <a href="mailto:[email protected]">Amdatu Project
Team</a>
+ */
+public interface ConfigurationUpdateListener {
+ /**
+ * This method is invoked as soon as the configuration of this bundle is
updated.
+ */
+ public void configurationUpdated();
+}
Modified:
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/main/java/org/amdatu/cassandra/persistencemanager/hector/service/HectorCassandraPersistenceManagerImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/main/java/org/amdatu/cassandra/persistencemanager/hector/service/HectorCassandraPersistenceManagerImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-persistencemanager-hector/src/main/java/org/amdatu/cassandra/persistencemanager/hector/service/HectorCassandraPersistenceManagerImpl.java
Fri Feb 17 11:03:52 2012
@@ -169,12 +169,20 @@
m_config = dictionary;
}
else {
- // This is a runtime update of the configuration, which is not
supported
- // since the Hector holds a static reference to the configuration.
Only
- // a stop/start of the bundle will reload the class and thus
reload the
- // configuration
- getLogService().log(LogService.LOG_WARNING, "This service does not
support runtime updates of" +
- "configuration properties. You must stop/start the bundle
before the changes are effective.");
+ m_config = dictionary;
+ m_configUtil.init(CONFIG_PROPERTIES, m_config);
+ m_hostConfig = null;
+
+ // This is a runtime update of the configuration, we shutdown the
cluster. It will be recreated
+ // with the new configuration on the next request
+ getLogService().log(LogService.LOG_INFO, "Configuration of the
Hector PM has changed, restarting Cluster " +
+ "to apply the new configuration. This may take some
time.");
+
+ // Shutdown the cluster, but ONLY if it exists!
+ Cluster cluster = getHectorCluster(false);
+ if (cluster != null) {
+ HFactory.shutdownCluster(cluster);
+ }
}
}
@@ -215,13 +223,21 @@
}
public Cluster getHectorCluster() {
+ return getHectorCluster(true);
+ }
+
+ public Cluster getHectorCluster(boolean create) {
ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
try {
// Perform a classloader switch to prevent log4j trying to load
classes from the system classloader
// instead of the bundle classloader.
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
String clusterName = m_configUtil.get(CLUSTER_NAME, String.class);
- return HFactory.getOrCreateCluster(clusterName,
getCassandraHostConfigurator());
+ if (create) {
+ return HFactory.getOrCreateCluster(clusterName,
getCassandraHostConfigurator());
+ } else {
+ return HFactory.getCluster(clusterName);
+ }
}
finally {
Thread.currentThread().setContextClassLoader(oldClassLoader);
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits