Author: [email protected]
Date: Wed Feb 15 13:46:47 2012
New Revision: 2088
Log:
[AMDATUCASSANDRA-158] Fixed catching runtime exceptions
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-listener/src/main/java/org/amdatu/cassandra/listener/service/CassandraUpdateListenerImpl.java
Wed Feb 15 13:46:47 2012
@@ -13,8 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.amdatu.cassandra.listener.service;
-
+package org.amdatu.cassandra.listener.service;
+
import org.amdatu.cassandra.client.CassandraClientService;
import org.amdatu.cassandra.listener.ColumnFamilyAvailable;
import org.amdatu.cassandra.listener.ColumnFamilyDefinition;
@@ -39,274 +39,262 @@
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.service.log.LogService;
-
-/**
- * This class is responsible for listening to changes (add or remove) in the
Cassandra database with respect to
- * keyspaces and ColumnFamilies. When a keyspace is added, it will register a
new CassandraPersistenceManager.
- * When a keyspace is dropped, it will unregister the corresponding
CassandraPersistenceManager.
- * When a ColumnFamily is added, it will register a new ColumnFamilyAvailable
service. When a ColumnFamily is dropped
- * it will unregister the corresponding ColumnFamilyAvailable service.
- * Note that in a clustered setup, keyspaces and column families can be added
or removed by other nodes in the cluster.
- * Since there is no event mechanism for triggering those changes, we create
an inspect thread that inspects available
- * keyspaces and column families each x seconds.
- *
- * @author ivol
- */
+
+/**
+ * This class is responsible for listening to changes (add or remove) in the
Cassandra database with respect to
+ * keyspaces and ColumnFamilies. When a keyspace is added, it will register a
new CassandraPersistenceManager.
+ * When a keyspace is dropped, it will unregister the corresponding
CassandraPersistenceManager.
+ * When a ColumnFamily is added, it will register a new ColumnFamilyAvailable
service. When a ColumnFamily is dropped
+ * it will unregister the corresponding ColumnFamilyAvailable service.
+ * Note that in a clustered setup, keyspaces and column families can be added
or removed by other nodes in the cluster.
+ * Since there is no event mechanism for triggering those changes, we create
an inspect thread that inspects available
+ * keyspaces and column families each x seconds.
+ *
+ * @author ivol
+ */
public class CassandraUpdateListenerImpl {
private static final String SYSTEM_TABLE = "system";
-
- // Services injected by the dependency manager
- private volatile LogService m_logService;
- private volatile CassandraClientService m_daemonService;
- private volatile CassandraPersistenceManagerFactory m_pmFactory;
- private volatile BundleContext m_context;
- private volatile DependencyManager m_dependencyManager;
-
- // The thread that inspects the Cassandra database for changes
- private InspectKeyspaceColumnFamilyThread m_inspectThread;
-
- // The interval for each individual inspect
- private static int INSPECT_INTERVAL = 5000;
-
- private Map<String, Component> m_componentMap = new
ConcurrentHashMap<String, Component>();
-
- public void start() {
- // Now start the inspect thread
- m_inspectThread = new InspectKeyspaceColumnFamilyThread();
- m_inspectThread.start();
- }
-
- public void stop() {
- // Stop the inspect thread
- m_inspectThread.interrupt();
+
+ // Services injected by the dependency manager
+ private volatile LogService m_logService;
+ private volatile CassandraClientService m_daemonService;
+ private volatile CassandraPersistenceManagerFactory m_pmFactory;
+ private volatile BundleContext m_context;
+ private volatile DependencyManager m_dependencyManager;
+
+ // The thread that inspects the Cassandra database for changes
+ private InspectKeyspaceColumnFamilyThread m_inspectThread;
+
+ // The interval for each individual inspect
+ private static int INSPECT_INTERVAL = 5000;
+
+ private Map<String, Component> m_componentMap = new
ConcurrentHashMap<String, Component>();
+
+ public void start() {
+ // Now start the inspect thread
+ m_inspectThread = new InspectKeyspaceColumnFamilyThread();
+ m_inspectThread.start();
}
-
+
+ public void stop() {
+ // Stop the inspect thread
+ m_inspectThread.interrupt();
+ }
+
public void setDaemonService(CassandraClientService daemonService) {
m_daemonService = daemonService;
}
-
+
public void
setPersistenceManagerFactory(CassandraPersistenceManagerFactory factory) {
m_pmFactory = factory;
}
-
+
public void setBundleContext(BundleContext context) {
m_context = context;
- }
+ }
public void setDependencyManager(DependencyManager depMgr) {
m_dependencyManager = depMgr;
}
-
+
public void setLogService(LogService logService) {
m_logService = logService;
}
-
- /**
- * This Thread inspects available Keyspaces and ColumnFamilies in
Cassandra and compares these to
- * the registered CassandraPersistenceManager and ColumnFamilyAvailable
services. Since other nodes
- * in the cluster may add, update or drop keyspaces and column families
and Cassandra does not support
- * any event mechanism to act upon these events we will continuously
inspect this ourselves.
- *
- * @author ivol
- *
- */
- class InspectKeyspaceColumnFamilyThread extends Thread {
- private Map<String, List<String>> m_snapshot = new HashMap<String,
List<String>>();
-
- @Override
- public void run() {
- try {
- while (!isInterrupted()) {
- // Inspect available keyspaces
- try {
- // Only compare keyspaces with CPM's if there was a
keyspace change in the meantime
- // We make a snapshot before we start checking if any
updates are needed and performing them.
- // Updates are needed when the snapshot taken now
differs from the one taken previously
- Map<String, List<String>> newSnapshot = getSnapshot();
- if (!m_snapshot.equals(newSnapshot)) {
- m_snapshot = newSnapshot;
- onKeyspaceAdded();
- onKeyspaceDropped();
- onColumnFamilyAdded();
- onColumnFamilyRemoved();
- }
- else {
- m_snapshot = newSnapshot;
- }
- }
- catch (TException e) {
- m_logService.log(LogService.LOG_ERROR,
- "Could not retrieve keyspaces. Cause: " +
e.getMessage(), e);
- }
- catch (InvalidRequestException e) {
- m_logService.log(LogService.LOG_ERROR, "Could not
retrieve keyspaces. Cause: " + e.why, e);
- }
- catch (InvalidSyntaxException e) {
- m_logService.log(LogService.LOG_ERROR,
- "Could not retrieve Cassandra Persistence Manager
services. Cause: " + e.getMessage(), e);
- }
- catch (NotFoundException e) {
- m_logService.log(
- LogService.LOG_ERROR,
- "An error occurred while synchronizing
ColumnFamilyAvailable services. Cause: "
- + e.getMessage(), e);
- }
-
- Thread.sleep(INSPECT_INTERVAL);
- }
- }
- catch (InterruptedException e) {
- m_logService.log(LogService.LOG_INFO, "Cassandra update
listener thread interrupted.");
- }
- }
-
- // Creates and returns a snapshot of the currently available keyspaces
and column families
- private Map<String, List<String>> getSnapshot() throws TException,
InvalidRequestException, NotFoundException {
- Map<String, List<String>> map = new HashMap<String,
List<String>>();
+
+ /**
+ * This Thread inspects available Keyspaces and ColumnFamilies in
Cassandra and compares these to
+ * the registered CassandraPersistenceManager and ColumnFamilyAvailable
services. Since other nodes
+ * in the cluster may add, update or drop keyspaces and column families
and Cassandra does not support
+ * any event mechanism to act upon these events we will continuously
inspect this ourselves.
+ *
+ * @author ivol
+ *
+ */
+ class InspectKeyspaceColumnFamilyThread extends Thread {
+ private Map<String, List<String>> m_snapshot = new HashMap<String,
List<String>>();
+
+ @Override
+ public void run() {
+ try {
+ while (!isInterrupted()) {
+ // Inspect available keyspaces
+ try {
+ // Only compare keyspaces with CPM's if there was a
keyspace change in the meantime
+ // We make a snapshot before we start checking if any
updates are needed and performing them.
+ // Updates are needed when the snapshot taken now
differs from the one taken previously
+ Map<String, List<String>> newSnapshot = getSnapshot();
+ if (!m_snapshot.equals(newSnapshot)) {
+ m_snapshot = newSnapshot;
+ onKeyspaceAdded();
+ onKeyspaceDropped();
+ onColumnFamilyAdded();
+ onColumnFamilyRemoved();
+ }
+ else {
+ m_snapshot = newSnapshot;
+ }
+ }
+ catch (Exception e) {
+ m_logService.log(LogService.LOG_ERROR,
+ "An error occurred while checking for Keyspace
and/or ColumnFamily updates. Cause: "
+ + e.getMessage(), e);
+ }
+
+ Thread.sleep(INSPECT_INTERVAL);
+ }
+ }
+ catch (InterruptedException e) {
+ m_logService.log(LogService.LOG_INFO, "Cassandra update
listener thread interrupted.");
+ }
+ }
+
+ // Creates and returns a snapshot of the currently available keyspaces
and column families
+ private Map<String, List<String>> getSnapshot() throws TException,
InvalidRequestException, NotFoundException {
+ Map<String, List<String>> map = new HashMap<String,
List<String>>();
for (String keyspace : m_daemonService.getKeyspaces()) {
// Ignore the system keyspace
- if (!SYSTEM_TABLE.equals(keyspace)) {
+ if (!SYSTEM_TABLE.equals(keyspace)) {
map.put(keyspace,
m_daemonService.getColumnFamilies(keyspace));
- }
- }
- return map;
- }
-
- // Loop over all keyspaces and register a Cassandra persistence
manager when there is none
- // available for that keyspace
- private void onKeyspaceAdded() throws InvalidSyntaxException,
TException, InvalidRequestException,
- NotFoundException {
- Set<String> keyspaces = m_snapshot.keySet();
- if (keyspaces != null) {
- for (String keyspace : keyspaces) {
- String filter = "(" +
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
- ServiceReference[] servRefs =
-
m_context.getAllServiceReferences(CassandraPersistenceManager.class.getName(),
filter);
- if (servRefs == null || servRefs.length == 0) {
- // No Cassandra persistence manager available for this
keyspace, register it now
-
m_pmFactory.createCassandraPersistenceManager(keyspace);
- }
- }
-
- // Now verify the keyspace global CF providers and add
ColumnFamilies for new keyspaces when needed
- verifyKeyspaceGlobalProviders();
- }
- }
-
- // Special use case: when a ColumnFamilyProvider is registered as a
keyspace global service (so keyspace
- // equals null), the ColumnFamily should be present in all keyspaces.
So when a keyspace is added, the
- // ColumnFamily should be added too for such a provider
- public void verifyKeyspaceGlobalProviders() throws NotFoundException,
InvalidRequestException, TException,
- InvalidSyntaxException {
- ServiceReference[] servRefs =
m_context.getAllServiceReferences(ColumnFamilyProvider.class.getName(), null);
- if (servRefs != null) {
- for (ServiceReference ref : servRefs) {
- ColumnFamilyProvider provider = (ColumnFamilyProvider)
m_context.getService(ref);
- if (isKeyspaceGlobal(provider)) {
- Set<String> keyspaces = m_snapshot.keySet();
- if (keyspaces != null) {
- for (String keyspace : keyspaces) {
- // Verify that the ColumnFamily for this
keyspace global provider is available
- // in this keyspace
+ }
+ }
+ return map;
+ }
+
+ // Loop over all keyspaces and register a Cassandra persistence
manager when there is none
+ // available for that keyspace
+ private void onKeyspaceAdded() throws InvalidSyntaxException,
TException, InvalidRequestException,
+ NotFoundException {
+ Set<String> keyspaces = m_snapshot.keySet();
+ if (keyspaces != null) {
+ for (String keyspace : keyspaces) {
+ String filter = "(" +
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
+ ServiceReference[] servRefs =
+
m_context.getAllServiceReferences(CassandraPersistenceManager.class.getName(),
filter);
+ if (servRefs == null || servRefs.length == 0) {
+ // No Cassandra persistence manager available for this
keyspace, register it now
+
m_pmFactory.createCassandraPersistenceManager(keyspace);
+ }
+ }
+
+ // Now verify the keyspace global CF providers and add
ColumnFamilies for new keyspaces when needed
+ verifyKeyspaceGlobalProviders();
+ }
+ }
+
+ // Special use case: when a ColumnFamilyProvider is registered as a
keyspace global service (so keyspace
+ // equals null), the ColumnFamily should be present in all keyspaces.
So when a keyspace is added, the
+ // ColumnFamily should be added too for such a provider
+ public void verifyKeyspaceGlobalProviders() throws NotFoundException,
InvalidRequestException, TException,
+ InvalidSyntaxException {
+ ServiceReference[] servRefs =
m_context.getAllServiceReferences(ColumnFamilyProvider.class.getName(), null);
+ if (servRefs != null) {
+ for (ServiceReference ref : servRefs) {
+ ColumnFamilyProvider provider = (ColumnFamilyProvider)
m_context.getService(ref);
+ if (isKeyspaceGlobal(provider)) {
+ Set<String> keyspaces = m_snapshot.keySet();
+ if (keyspaces != null) {
+ for (String keyspace : keyspaces) {
+ // Verify that the ColumnFamily for this
keyspace global provider is available
+ // in this keyspace
for (ColumnFamilyDefinition cfDef :
provider.getColumnFamilies()) {
- String cfName = cfDef.getName();
- if
(!m_snapshot.get(keyspace).contains(cfName)) {
- m_logService.log(LogService.LOG_DEBUG,
- "Adding ColumnFamily '" +
cfDef.getName() + "' to keyspace '"
- + keyspace + "' for the
keyspace-global ColumnFamilyProvider '"
+ String cfName = cfDef.getName();
+ if
(!m_snapshot.get(keyspace).contains(cfName)) {
+ m_logService.log(LogService.LOG_DEBUG,
+ "Adding ColumnFamily '" +
cfDef.getName() + "' to keyspace '"
+ + keyspace + "' for the
keyspace-global ColumnFamilyProvider '"
+
provider.getClass().getName() + "'");
-
m_daemonService.addColumnFamily(keyspace, cfDef.getCfDef());
- }
- }
- }
- }
- }
- }
- }
- }
-
- private boolean isKeyspaceGlobal(final ColumnFamilyProvider provider) {
- for (ColumnFamilyDefinition cfDef : provider.getColumnFamilies()) {
- if (cfDef.getKeyspaces() == null) {
- return true;
- }
- }
- return false;
- }
-
- // Loop over all Cassandra Persistence Managers and unregister them if
the corresponding keyspace
- // is removed from Cassandra
- private void onKeyspaceDropped() throws InvalidSyntaxException,
TException, InvalidRequestException {
- Set<String> keyspaces = m_snapshot.keySet();
- ServiceReference[] servRefs =
-
m_context.getAllServiceReferences(CassandraPersistenceManager.class.getName(),
null);
- if (servRefs != null) {
- for (ServiceReference servRef : servRefs) {
- String keyspace =
servRef.getProperty(CassandraPersistenceManager.KEYSPACE_AWARE_KEY).toString();
- if (keyspaces == null || !keyspaces.contains(keyspace)) {
- // No keyspace available for this Cassandra
persistence manager, unregister the service now
- m_context.ungetService(servRef);
- m_logService.log(LogService.LOG_INFO, "Cassandra
Persistence Manager service for keyspace '"
- + keyspace + "' unregistered");
- }
- }
- }
- }
-
- private void onColumnFamilyAdded() throws TException,
InvalidRequestException, InvalidSyntaxException,
- NotFoundException {
- Set<String> keyspaces = m_snapshot.keySet();
- if (keyspaces != null) {
- for (String keyspace : keyspaces) {
- List<String> columnFamilies = m_snapshot.get(keyspace);
- for (String columnFamily : columnFamilies) {
- String ksFilter = "(" +
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
- String cfFilter = "(" +
ColumnFamilyAvailable.FILTER_NAME + "=" + columnFamily + ")";
- String filter = "(&" + ksFilter + cfFilter + ")";
- ServiceReference[] servRefs =
-
m_context.getAllServiceReferences(ColumnFamilyAvailable.class.getName(),
filter);
- if (servRefs == null || servRefs.length == 0) {
- registerColumnamilyAvailableService(keyspace,
columnFamily);
- }
- }
- }
- }
- }
-
- private void onColumnFamilyRemoved() throws TException,
InvalidRequestException, InvalidSyntaxException,
- NotFoundException {
- ServiceReference[] servRefs =
-
m_context.getAllServiceReferences(ColumnFamilyAvailable.class.getName(), null);
- if (servRefs != null) {
- for (ServiceReference servRef : servRefs) {
- String keyspace =
servRef.getProperty(CassandraPersistenceManager.KEYSPACE_AWARE_KEY).toString();
- String columnFamily =
servRef.getProperty(ColumnFamilyAvailable.FILTER_NAME).toString();
- if (m_snapshot.get(keyspace) == null ||
!m_snapshot.get(keyspace).contains(columnFamily)) {
- if (m_componentMap.containsKey(columnFamily)) {
-
m_dependencyManager.remove(m_componentMap.get(columnFamily));
- m_componentMap.remove(columnFamily);
- m_logService.log(LogService.LOG_INFO,
"ColumnFamilyAvailable service for keyspace '"
- + keyspace
- + "' and ColumnFamily '" + columnFamily + "'
unregistered");
- }
- }
- }
- }
- }
-
- private void registerColumnamilyAvailableService(final String
keyspace, final String columnFamily) {
- Dictionary<String, String> serviceProps = new Hashtable<String,
String>();
- serviceProps.put(CassandraPersistenceManager.KEYSPACE_AWARE_KEY,
keyspace);
- serviceProps.put(ColumnFamilyAvailable.FILTER_NAME, columnFamily);
-
- Component component = m_dependencyManager.createComponent()
- .setInterface(ColumnFamilyAvailable.class.getName(),
serviceProps)
- .setImplementation(ColumnFamilyAvailableImpl.class);
-
- m_dependencyManager.add(component);
- m_componentMap.put(columnFamily, component);
- m_logService.log(LogService.LOG_INFO, "ColumnFamilyAvailable
service for keyspace '" + keyspace
- + "' and ColumnFamily '" + columnFamily + "' registered");
- }
- }
-}
+
m_daemonService.addColumnFamily(keyspace, cfDef.getCfDef());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private boolean isKeyspaceGlobal(final ColumnFamilyProvider provider) {
+ for (ColumnFamilyDefinition cfDef : provider.getColumnFamilies()) {
+ if (cfDef.getKeyspaces() == null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // Loop over all Cassandra Persistence Managers and unregister them if
the corresponding keyspace
+ // is removed from Cassandra
+ private void onKeyspaceDropped() throws InvalidSyntaxException,
TException, InvalidRequestException {
+ Set<String> keyspaces = m_snapshot.keySet();
+ ServiceReference[] servRefs =
+
m_context.getAllServiceReferences(CassandraPersistenceManager.class.getName(),
null);
+ if (servRefs != null) {
+ for (ServiceReference servRef : servRefs) {
+ String keyspace =
servRef.getProperty(CassandraPersistenceManager.KEYSPACE_AWARE_KEY).toString();
+ if (keyspaces == null || !keyspaces.contains(keyspace)) {
+ // No keyspace available for this Cassandra
persistence manager, unregister the service now
+ m_context.ungetService(servRef);
+ m_logService.log(LogService.LOG_INFO, "Cassandra
Persistence Manager service for keyspace '"
+ + keyspace + "' unregistered");
+ }
+ }
+ }
+ }
+
+ private void onColumnFamilyAdded() throws TException,
InvalidRequestException, InvalidSyntaxException,
+ NotFoundException {
+ Set<String> keyspaces = m_snapshot.keySet();
+ if (keyspaces != null) {
+ for (String keyspace : keyspaces) {
+ List<String> columnFamilies = m_snapshot.get(keyspace);
+ for (String columnFamily : columnFamilies) {
+ String ksFilter = "(" +
CassandraPersistenceManager.KEYSPACE_AWARE_KEY + "=" + keyspace + ")";
+ String cfFilter = "(" +
ColumnFamilyAvailable.FILTER_NAME + "=" + columnFamily + ")";
+ String filter = "(&" + ksFilter + cfFilter + ")";
+ ServiceReference[] servRefs =
+
m_context.getAllServiceReferences(ColumnFamilyAvailable.class.getName(),
filter);
+ if (servRefs == null || servRefs.length == 0) {
+ registerColumnamilyAvailableService(keyspace,
columnFamily);
+ }
+ }
+ }
+ }
+ }
+
+ private void onColumnFamilyRemoved() throws TException,
InvalidRequestException, InvalidSyntaxException,
+ NotFoundException {
+ ServiceReference[] servRefs =
+
m_context.getAllServiceReferences(ColumnFamilyAvailable.class.getName(), null);
+ if (servRefs != null) {
+ for (ServiceReference servRef : servRefs) {
+ String keyspace =
servRef.getProperty(CassandraPersistenceManager.KEYSPACE_AWARE_KEY).toString();
+ String columnFamily =
servRef.getProperty(ColumnFamilyAvailable.FILTER_NAME).toString();
+ if (m_snapshot.get(keyspace) == null ||
!m_snapshot.get(keyspace).contains(columnFamily)) {
+ if (m_componentMap.containsKey(columnFamily)) {
+
m_dependencyManager.remove(m_componentMap.get(columnFamily));
+ m_componentMap.remove(columnFamily);
+ m_logService.log(LogService.LOG_INFO,
"ColumnFamilyAvailable service for keyspace '"
+ + keyspace
+ + "' and ColumnFamily '" + columnFamily + "'
unregistered");
+ }
+ }
+ }
+ }
+ }
+
+ private void registerColumnamilyAvailableService(final String
keyspace, final String columnFamily) {
+ Dictionary<String, String> serviceProps = new Hashtable<String,
String>();
+ serviceProps.put(CassandraPersistenceManager.KEYSPACE_AWARE_KEY,
keyspace);
+ serviceProps.put(ColumnFamilyAvailable.FILTER_NAME, columnFamily);
+
+ Component component = m_dependencyManager.createComponent()
+ .setInterface(ColumnFamilyAvailable.class.getName(),
serviceProps)
+ .setImplementation(ColumnFamilyAvailableImpl.class);
+
+ m_dependencyManager.add(component);
+ m_componentMap.put(columnFamily, component);
+ m_logService.log(LogService.LOG_INFO, "ColumnFamilyAvailable
service for keyspace '" + keyspace
+ + "' and ColumnFamily '" + columnFamily + "' registered");
+ }
+ }
+}
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits