Author: [email protected]
Date: Wed Feb 29 16:50:59 2012
New Revision: 2132
Log:
[AMDATUCASSANDRA-176] Added monitor for r/w locks on the Keyspace ->
ColumnFamily type map
Modified:
branches/amdatu-cassandra-0.2.3/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
Modified:
branches/amdatu-cassandra-0.2.3/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
==============================================================================
---
branches/amdatu-cassandra-0.2.3/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
(original)
+++
branches/amdatu-cassandra-0.2.3/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientServiceImpl.java
Wed Feb 29 16:50:59 2012
@@ -36,6 +36,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.cassandra.thrift.CfDef;
import org.osgi.service.event.Event;
@@ -61,7 +65,9 @@
private volatile CassandraClientConfigurationService m_configuration =
null;
// Local cache of ColumnTypes of all ColumnFamilies for each keyspace
- private Map<String, Map<String, String>> m_keyspaceColumnFamilyTypes = new
HashMap<String, Map<String, String>>();
+ private final Map<String, Map<String, String>> m_keyspaceColumnFamilyTypes
=
+ new ConcurrentHashMap<String, Map<String, String>>();
+ private final ReentrantReadWriteLock m_ksCfMaplock = new
ReentrantReadWriteLock();
/**
* The init() method is invoked by the Felix dependency manager.
@@ -173,7 +179,7 @@
while (agreed == null && System.currentTimeMillis() < expires) {
m_logService.log(LogService.LOG_INFO,
"Schema definitions are not yet promulgated throughout the
cluster, waiting for schema agreement."
- + " Cluster schema's:" + getSchemaVersions());
+ + " Cluster schema's:" + getSchemaVersions());
try {
// Wait for 2 seconds, then try again
Thread.sleep(2000);
@@ -343,7 +349,15 @@
getHectorCluster().dropKeyspace(keyspace, true);
// Purge the keyspace from the keyspace -> CF type map
- m_keyspaceColumnFamilyTypes.remove(keyspace);
+ // Get a write lock before removing the keyspace from the map
+ WriteLock writeLock = m_ksCfMaplock.writeLock();
+ try {
+ writeLock.lock();
+ m_keyspaceColumnFamilyTypes.remove(keyspace);
+ }
+ finally {
+ writeLock.unlock();
+ }
// Publish an event that a keyspace has been dropped
Map<String, String> properties = new HashMap<String, String>();
@@ -384,34 +398,49 @@
return cfDef != null;
}
- public synchronized String getColumnType(final String keyspaceName, final
String columnFamilyName) {
+ public String getColumnType(final String keyspaceName, final String
columnFamilyName) {
Map<String, String> columnFamilyTypeMap;
- if (m_keyspaceColumnFamilyTypes.containsKey(keyspaceName)) {
- columnFamilyTypeMap =
m_keyspaceColumnFamilyTypes.get(keyspaceName);
- }
- else {
- columnFamilyTypeMap = new HashMap<String, String>();
+
+ // Get a read lock before getting the keyspace from the map
+ ReadLock readLock = m_ksCfMaplock.readLock();
+ try {
+ readLock.lock();
+ if (m_keyspaceColumnFamilyTypes.containsKey(keyspaceName)) {
+ columnFamilyTypeMap =
m_keyspaceColumnFamilyTypes.get(keyspaceName);
+ }
+ else {
+ columnFamilyTypeMap = new HashMap<String, String>();
+ }
+ if (columnFamilyTypeMap.containsKey(columnFamilyName)) {
+ return columnFamilyTypeMap.get(columnFamilyName);
+ }
}
- if (columnFamilyTypeMap.containsKey(columnFamilyName)) {
- return columnFamilyTypeMap.get(columnFamilyName);
+ finally {
+ readLock.unlock();
}
- else {
- try {
- KeyspaceDefinition ksDef =
getHectorCluster().describeKeyspace(keyspaceName);
- List<ColumnFamilyDefinition> cfDefs = ksDef.getCfDefs();
- for (ColumnFamilyDefinition cfDef : cfDefs) {
- if (cfDef.getName().equals(columnFamilyName)) {
- columnFamilyTypeMap.put(columnFamilyName,
cfDef.getColumnType().getValue());
- m_keyspaceColumnFamilyTypes.put(keyspaceName,
columnFamilyTypeMap);
- return cfDef.getColumnType().getValue();
- }
+
+ // Get a write lock before update the keyspace in the map
+ WriteLock writeLock = m_ksCfMaplock.writeLock();
+ try {
+ writeLock.lock();
+
+ KeyspaceDefinition ksDef =
getHectorCluster().describeKeyspace(keyspaceName);
+ List<ColumnFamilyDefinition> cfDefs = ksDef.getCfDefs();
+ for (ColumnFamilyDefinition cfDef : cfDefs) {
+ if (cfDef.getName().equals(columnFamilyName)) {
+ columnFamilyTypeMap.put(columnFamilyName,
cfDef.getColumnType().getValue());
+ m_keyspaceColumnFamilyTypes.put(keyspaceName,
columnFamilyTypeMap);
+ return cfDef.getColumnType().getValue();
}
- return null;
- }
- catch (HectorException e) {
- // Throw the unchecked Hector exception as a new unchecked
Thrift exception
- throw new
ThriftException(e).setKeyspace(keyspaceName).setColumnFamily(columnFamilyName);
}
+ return null;
+ }
+ catch (HectorException e) {
+ // Throw the unchecked Hector exception as a new unchecked Thrift
exception
+ throw new
ThriftException(e).setKeyspace(keyspaceName).setColumnFamily(columnFamilyName);
+ }
+ finally {
+ writeLock.unlock();
}
}
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits