Author: ivol37 at gmail.com
Date: Fri Jan 7 15:43:30 2011
New Revision: 579
Log:
[AMDATU-243] Several clustering related bugfixes:
- The CassandraDaemon service now waits for 5 seconds immediately after
activating the cassandra daemon. This gives the daemon the time to synchronize
data with other nodes in the cluster. Without this timeout, the synchronization
will happen while Amdatu adds keyspaces and columnfamilies since the casasndra
db is empty upon first startup. This caused many sync issues, which are
vanished after adding this timeout.
- Changed the READ consistency level to ONE. With ALL, a two-node cluster goes
down whenever one of the two nodes goes down.
- Small bugfix in gadget management service triggered by clustered setup
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
trunk/amdatu-cassandra/cassandra-persistencemanager/src/main/java/org/amdatu/cassandra/persistencemanager/service/CassandraPersistenceManagerImpl.java
trunk/amdatu-opensocial/gadgetmanagement/src/main/java/org/amdatu/opensocial/gadgetmanagement/service/GadgetManagementServiceImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
Fri Jan 7 15:43:30 2011
@@ -42,9 +42,15 @@
public class CassandraDaemonServiceImpl implements CassandraDaemonService {
// The default placement strategy
private final String DEFAULT_PLACEMENT_STRATEGY =
"org.apache.cassandra.locator.SimpleStrategy";
-
private final int DEFAULT_REPLICATION_FACTOR = 1;
+ // The amount of milliseconds to wait after starting the Cassandra daemon.
This gives the Cassandra the
+ // time to synchronize its data with other nodes in the cluster and/or
perform any local migrations.
+ // If we wouldn't wait, this node will start adding keyspaces and
columnfamilies while it is also
+ // synchronizing the same keyspaces and columnfamily's with other nodes in
the cluster. This significantly
+ // increases the risk of synchronization errors and so we just wait for
this amount of time before continuing.
+ private final int CASSANDRA_HALT_TIMEOUT = 5000;
+
// Service dependencies, injected by the framework
private volatile LogService m_logService;
private volatile EventAdmin m_eventAdmin;
@@ -84,6 +90,16 @@
}
m_daemon.activate();
m_cassandraServer = new CassandraServer();
+
+ try {
+ // Wait CASSANDRA_HALT_TIMEOUT seconds. This allows this Cassandra
node to synchronize its data
+ // with other nodes in the cluster, before we start initializing
our bundles which potentially
+ // try to add keyspaces and columnfamilies
+ Thread.sleep(CASSANDRA_HALT_TIMEOUT);
+ }
+ catch (InterruptedException e) {
+ }
+
m_logService.log(LogService.LOG_INFO, "Cassandra Daemon started.");
}
@@ -103,17 +119,17 @@
return m_cassandraServer;
}
- public boolean keyspaceExists(String keyspaceName) throws TException,
InvalidRequestException {
- List<KsDef> keyspaces = m_cassandraServer.describe_keyspaces();
- for (KsDef keyspace : keyspaces) {
- if (keyspace.getName().equals(keyspaceName)) {
- return true;
- }
+ public synchronized boolean keyspaceExists(String keyspaceName) throws
TException, InvalidRequestException {
+ try {
+ m_cassandraServer.describe_keyspace(keyspaceName);
+ return true;
+ }
+ catch (NotFoundException e) {
+ return false;
}
- return false;
}
- public List<String> getKeyspaces() throws TException,
InvalidRequestException {
+ public synchronized List<String> getKeyspaces() throws TException,
InvalidRequestException {
List<String> keyspaceNames = new ArrayList<String>();
List<KsDef> keyspaces = m_cassandraServer.describe_keyspaces();
for (KsDef keyspace : keyspaces) {
@@ -126,7 +142,19 @@
if (!keyspaceExists(name)) {
List<CfDef> empty = new ArrayList<CfDef>();
KsDef ksDef = new KsDef(name, DEFAULT_PLACEMENT_STRATEGY,
DEFAULT_REPLICATION_FACTOR, empty);
- m_cassandraServer.system_add_keyspace(ksDef);
+ try {
+ m_cassandraServer.system_add_keyspace(ksDef);
+ } catch (InvalidRequestException e) {
+ // Now this error may appear if some other node in the cluster
added this keyspace
+ // in the meantime and this keyspace was synchornized to this
node. So verify if the
+ // keyspace exists after all, and if so, return without
throwing an exception
+ if (keyspaceExists(name)) {
+ m_logService.log(LogService.LOG_DEBUG, "Keyspace '" + name
+ "' was not added since it already existed");
+ return false;
+ } else {
+ throw e;
+ }
+ }
// Publish an event that a new keyspace has been added
Map<String, String> properties = new HashMap<String, String>();
Modified:
trunk/amdatu-cassandra/cassandra-persistencemanager/src/main/java/org/amdatu/cassandra/persistencemanager/service/CassandraPersistenceManagerImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-persistencemanager/src/main/java/org/amdatu/cassandra/persistencemanager/service/CassandraPersistenceManagerImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-persistencemanager/src/main/java/org/amdatu/cassandra/persistencemanager/service/CassandraPersistenceManagerImpl.java
Fri Jan 7 15:43:30 2011
@@ -65,9 +65,11 @@
// Empty byte array
private final static ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]);
- // The consistency level to use for READ operations
- private final static ConsistencyLevel READ_CONSISTENCY_LEVEL =
ConsistencyLevel.ALL;
-
+ // The consistency level to use for READ operations. Note that if we
define QUORUM or ALL and the
+ // cluster only consists of two nodes; all READ operations will return a
timeout as soon as one
+ // node goes down.
+ private final static ConsistencyLevel READ_CONSISTENCY_LEVEL =
ConsistencyLevel.ONE;
+
// The consistency level to use for WRITE operations
private final static ConsistencyLevel WRITE_CONSISTENCY_LEVEL =
ConsistencyLevel.ONE;
@@ -162,7 +164,7 @@
}
public boolean exists(String columnFamilyName, String rowKey, String
superColumnName, String columnName)
- throws CassandraException {
+ throws CassandraException {
try {
ColumnPath columnPath = new ColumnPath(columnFamilyName);
columnPath.setSuper_column(toBytes(superColumnName));
@@ -269,7 +271,7 @@
}
public Map<String, Map<String, Map<String, byte[]>>> getSuperValues(String
columnFamilyName)
- throws CassandraException {
+ throws CassandraException {
try {
ColumnParent columnParent = new ColumnParent(columnFamilyName);
SlicePredicate p = new SlicePredicate();
@@ -296,7 +298,7 @@
}
public Map<String, Map<String, byte[]>> getSuperValues(String
columnFamilyName, String rowKey)
- throws CassandraException {
+ throws CassandraException {
try {
ColumnParent columnParent = new ColumnParent(columnFamilyName);
SlicePredicate p = new SlicePredicate();
@@ -329,7 +331,7 @@
}
public Map<String, Map<String, String>> getSuperStringValues(String
columnFamilyName, String rowKey)
- throws CassandraException {
+ throws CassandraException {
try {
ColumnParent columnParent = new ColumnParent(columnFamilyName);
SlicePredicate p = new SlicePredicate();
@@ -362,7 +364,7 @@
}
public byte[] getValue(String columnFamilyName, String rowKey, String
superColumnName, String columnName)
- throws CassandraException {
+ throws CassandraException {
try {
ColumnPath columnPath = new ColumnPath(columnFamilyName);
columnPath.setSuper_column(toBytes(superColumnName));
@@ -391,7 +393,7 @@
}
public String getStringValue(String columnFamilyName, String rowKey,
String superColumnName, String columnName)
- throws CassandraException {
+ throws CassandraException {
try {
byte[] value = getValue(columnFamilyName, rowKey, superColumnName,
columnName);
if (value != null) {
@@ -414,7 +416,7 @@
}
public Map<String, byte[]> getValues(String columnFamilyName, String
rowKey, String superColumnName)
- throws CassandraException {
+ throws CassandraException {
try {
ColumnParent columnParent = new ColumnParent(columnFamilyName);
columnParent.setSuper_column(toBytes(superColumnName));
@@ -448,7 +450,7 @@
}
public Map<String, String> getStringValues(String columnFamilyName, String
rowKey, String superColumnName)
- throws CassandraException {
+ throws CassandraException {
try {
Map<String, byte[]> byteValues = getValues(columnFamilyName,
rowKey, superColumnName);
if (byteValues != null) {
@@ -475,7 +477,7 @@
}
public void setValue(String columnFamilyName, String rowKey, String
superColumnName, String columnName, byte[] value)
- throws CassandraException {
+ throws CassandraException {
try {
long timestamp = System.currentTimeMillis();
ColumnParent column_parent = new ColumnParent(columnFamilyName);
@@ -501,13 +503,13 @@
// significantly increases the probability that on a standalone
installation with a single thread getValue() will return the value
// that has previously been set using setValue()
public void setValueSynchronously(String columnFamilyName, String rowKey,
String superColumnName, String columnName, byte[] value)
- throws CassandraException {
+ throws CassandraException {
int retry = 0;
boolean success = false;
while (retry < MAX_RETRIES && !success) {
// First asynchronously set the value
setValue(columnFamilyName, rowKey, superColumnName, columnName,
value);
-
+
// Now verify the result
byte[] persistentByteValue = getValue(columnFamilyName, rowKey,
superColumnName, columnName);
success = Arrays.equals(value, persistentByteValue);
@@ -523,9 +525,9 @@
if (!success) {
try {
String errorMsg = "setValueSynchronously failed for
ColumnFamily '" + columnFamilyName + "', rowKey '" + rowKey
- + "', SuperColumn '" + superColumnName + "',
columnName '" + columnName + "', value '"
- + toString(value) + "'. See
http://jira.amdatu.org/jira/browse/AMDATU-137. Failed to correct the error "
- + "after retrying " + (retry - 1) + " times.";
+ + "', SuperColumn '" + superColumnName + "', columnName '" +
columnName + "', value '"
+ + toString(value) + "'. See
http://jira.amdatu.org/jira/browse/AMDATU-137. Failed to correct the error "
+ + "after retrying " + (retry - 1) + " times.";
throw new CassandraException(errorMsg);
}
catch (UnsupportedEncodingException e) {
@@ -567,12 +569,12 @@
}
public void deleteSuperColumn(String columnFamilyName, String rowKey,
String superColumnName)
- throws CassandraException {
+ throws CassandraException {
deleteColumn(columnFamilyName, rowKey, superColumnName, null);
}
public void deleteColumn(String columnFamilyName, String rowKey, String
superColumnName, String columnName)
- throws CassandraException {
+ throws CassandraException {
try {
long timestamp = System.currentTimeMillis();
ColumnPath columnPath = new ColumnPath(columnFamilyName);
@@ -603,7 +605,7 @@
}
public <T> T loadBean(Class<T> clazz, String id, String columnFamilyName,
String superColumnName)
- throws CassandraException {
+ throws CassandraException {
boolean exists = false;
T bean;
try {
@@ -636,7 +638,7 @@
catch (Exception e) {
if (!(e instanceof RuntimeException)) {
throw new
CassandraException(e).setKeyspace(m_keyspace).setColumnFamily(columnFamilyName)
- .setSuperColumn(superColumnName);
+ .setSuperColumn(superColumnName);
}
else {
throw (RuntimeException) e;
@@ -665,7 +667,7 @@
}
public void persistBean(Object bean, String id, String columnFamilyName,
String superColumnName)
- throws CassandraException {
+ throws CassandraException {
// FIXME: For now, just persist all bean properties for which getters
are available
Method[] methods = bean.getClass().getMethods();
for (Method method : methods) {
@@ -782,7 +784,7 @@
private String toString(byte[] bytes) throws UnsupportedEncodingException {
return new String(bytes, DEFAULT_CHARSET);
}
-
+
private List<ByteBuffer> toBytesList(List<byte[]> values) {
List<ByteBuffer> result = new ArrayList<ByteBuffer>();
for (byte[] value : values) {
Modified:
trunk/amdatu-opensocial/gadgetmanagement/src/main/java/org/amdatu/opensocial/gadgetmanagement/service/GadgetManagementServiceImpl.java
==============================================================================
---
trunk/amdatu-opensocial/gadgetmanagement/src/main/java/org/amdatu/opensocial/gadgetmanagement/service/GadgetManagementServiceImpl.java
(original)
+++
trunk/amdatu-opensocial/gadgetmanagement/src/main/java/org/amdatu/opensocial/gadgetmanagement/service/GadgetManagementServiceImpl.java
Fri Jan 7 15:43:30 2011
@@ -322,7 +322,7 @@
}
boolean generateStartId = false;
- if (gadgetIds == null) {
+ if (gadgetIds == null || gadgetIds.length == 0) {
generateStartId = true;
gadgetIds = getDefaultGadgetUrls(request);
}