Author: ivol37 at gmail.com
Date: Fri Jan 14 13:52:01 2011
New Revision: 630
Log:
[AMDATU-266] Replaced the cassandra Avro daemon by the Thrift deamon. Starting
the daemon is moved to a separate thread as the Thrift client blocks the thread
when it is started. Also added logic to test if the daemon is started by
establishing a Thrift connection to the daemon. With this patch applied the
cassandra-cli is working again; it starts up without throwing OOM errors.
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
Fri Jan 14 13:52:01 2011
@@ -65,6 +65,21 @@
public static final String AUTOBOOTSTRAP_MODE = "autobootstrap";
/**
+ * Configuration key for the listen address
+ */
+ public static final String LISTEN_ADDRESS = "listen_address";
+
+ /**
+ * Configuration key for the RPC address
+ */
+ public static final String RPC_ADDRESS = "rpc_address";
+
+ /**
+ * Configuration key for the RPC port
+ */
+ public static final String RPC_PORT = "rpc_port";
+
+ /**
* Configuration key that stores a list of IP addresses that are part of
this cluster. The IP addresses
* are stored comma separated in ConfigurationAdmin.
*/
@@ -113,4 +128,16 @@
* @return true if auto bootstrap is enabled, false otherwise
*/
boolean isAutoBootstrapMode();
+
+ /**
+ * Returns the RPC address to be used by Thrift clients.
+ * @return the RPC adrdess.
+ */
+ String getRPCAddress();
+
+ /**
+ * Returns the RPC port to be used by Thrift clients.
+ * @return the RPC port.
+ */
+ int getRPCPort();
}
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
Fri Jan 14 13:52:01 2011
@@ -54,6 +54,8 @@
private ConsistencyLevel m_readConsistencyLevel;
private ConsistencyLevel m_writeConsistencyLevel;
private boolean m_bootstrapMode;
+ private String m_rpcAddress;
+ private int m_rpcPort;
/**
* The init() method is invoked by the Felix dependency manager. It allows
us to initialize our service. In this
@@ -111,6 +113,11 @@
m_readConsistencyLevel =
ConsistencyLevel.valueOf(dictionary.get(READ_CONSISTENCY_LEVEL).toString());
m_writeConsistencyLevel =
ConsistencyLevel.valueOf(dictionary.get(WRITE_CONSISTENCY_LEVEL).toString());
m_bootstrapMode =
dictionary.get(AUTOBOOTSTRAP_MODE).toString().equalsIgnoreCase("true");
+ m_rpcAddress = dictionary.get(RPC_ADDRESS).toString();
+ if (m_rpcAddress.isEmpty()) {
+ m_rpcAddress = dictionary.get(LISTEN_ADDRESS).toString();
+ }
+ m_rpcPort = Integer.parseInt(dictionary.get(RPC_PORT).toString());
}
}
@@ -147,4 +154,12 @@
public boolean isAutoBootstrapMode() {
return m_bootstrapMode;
}
+
+ public String getRPCAddress() {
+ return m_rpcAddress;
+ }
+
+ public int getRPCPort() {
+ return m_rpcPort;
+ }
}
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
Fri Jan 14 13:52:01 2011
@@ -23,14 +23,21 @@
import org.amdatu.cassandra.application.CassandraConfigurationService;
import org.amdatu.cassandra.application.CassandraDaemonService;
-import org.apache.cassandra.avro.CassandraDaemon;
import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CassandraDaemon;
import org.apache.cassandra.thrift.CassandraServer;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.log.LogService;
@@ -44,13 +51,6 @@
// The default placement strategy
private final String DEFAULT_PLACEMENT_STRATEGY =
"org.apache.cassandra.locator.SimpleStrategy";
- // The amount of milliseconds to wait after starting the Cassandra daemon.
This gives the Cassandra the
- // time to synchronize its data with other nodes in the cluster and/or
perform any local migrations.
- // If we wouldn't wait, this node will start adding keyspaces and
columnfamilies while it is also
- // synchronizing the same keyspaces and columnfamily's with other nodes in
the cluster. This significantly
- // increases the risk of synchronization errors and so we just wait for
this amount of time before continuing.
- private final static int CASSANDRA_HALT_TIMEOUT = 5000;
-
// Service dependencies, injected by the framework
private volatile LogService m_logService;
private volatile EventAdmin m_eventAdmin;
@@ -88,22 +88,25 @@
if (m_daemonHasShutdown) {
throw new RuntimeException("CassandraDaemon has already been
shutdown and cannot be restarted.");
}
- m_daemon.activate();
- m_cassandraServer = new CassandraServer();
+ // Activate the daemon from a separate thread, as the activate()
method never returns
+ new CassandraDaemonActivateThread().start();
try {
- // Wait CASSANDRA_HALT_TIMEOUT seconds. This allows this Cassandra
node to synchronize its data
- // with other nodes in the cluster, before we start initializing
our bundles which potentially
- // try to add keyspaces and columnfamilies
- Thread.sleep(CASSANDRA_HALT_TIMEOUT);
- }
- catch (InterruptedException e) {
+ // Now establish a Thrift connection to the daemon, the connection
is established as soon
+ // as the daemon is running.
+ testThriftConnection();
+
+ // Create the cassandra server
+ m_cassandraServer = new CassandraServer();
+
+ m_logService.log(LogService.LOG_INFO, "Cassandra Daemon started
with configuration: ");
+ m_logService.log(LogService.LOG_INFO, "> Default replication
factor: " + m_configuration.getDefaultReplicationFactor());
+ m_logService.log(LogService.LOG_INFO, "> Read consistency level: "
+ m_configuration.getReadConsistencyLevel());
+ m_logService.log(LogService.LOG_INFO, "> Write consistency level:
" + m_configuration.getWriteConsistencyLevel());
+ }
+ catch (TTransportException e) {
+ m_logService.log(LogService.LOG_INFO, "Could not establish a
Thrift connection to the Cassandra Daemon", e);
}
-
- m_logService.log(LogService.LOG_INFO, "Cassandra Daemon started with
configuration: ");
- m_logService.log(LogService.LOG_INFO, "> Default replication factor: "
+ m_configuration.getDefaultReplicationFactor());
- m_logService.log(LogService.LOG_INFO, "> Read consistency level: " +
m_configuration.getReadConsistencyLevel());
- m_logService.log(LogService.LOG_INFO, "> Write consistency level: " +
m_configuration.getWriteConsistencyLevel());
}
public void stop() {
@@ -176,6 +179,11 @@
}
public synchronized void setReplicationFactor(int replicationFactor)
throws InvalidRequestException, TException {
+ int clusterSize = StorageService.instance.getLiveNodes().size();
+ if (replicationFactor > clusterSize) {
+ throw new InvalidRequestException("Replication factor " +
replicationFactor + " may never exceed the amount of nodes " +
+ "in the cluster, which currently is " + clusterSize);
+ }
List<KsDef> keyspaces = m_cassandraServer.describe_keyspaces();
for (KsDef keyspace : keyspaces) {
if (keyspace.getReplication_factor() != replicationFactor) {
@@ -270,4 +278,26 @@
}
return false;
}
+
+ class CassandraDaemonActivateThread extends Thread {
+ public void run() {
+ m_daemon.activate();
+ }
+ }
+
+ /**
+ * This methods opens a Thrift connection to the Cassandra daemon and
returns if the connection
+ * has been established. This is usefull to ensure that the daemon is
running before continuing.
+ * @throws TTransportException
+ */
+ private void testThriftConnection() throws TTransportException {
+ String thrift = m_configuration.getRPCAddress() + ":" +
m_configuration.getRPCPort();
+ m_logService.log(LogService.LOG_INFO, "Establishing Thrift connection
to the Cassandra Daemon on " + thrift);
+ TTransport tr = new TSocket(m_configuration.getRPCAddress(),
m_configuration.getRPCPort());
+ TProtocol proto = new TBinaryProtocol(tr);
+ new Cassandra.Client(proto);
+ tr.open();
+ m_logService.log(LogService.LOG_INFO, "Thrift connection established
successfully");
+ }
+
}