Author: ivol37 at gmail.com
Date: Fri Jan 14 13:52:01 2011
New Revision: 630

Log:
[AMDATU-266] Replaced the cassandra Avro daemon by the Thrift deamon. Starting 
the daemon is moved to a separate thread as the Thrift client blocks the thread 
when it is started. Also added logic to test if the daemon is started by 
establishing a Thrift connection to the daemon. With this patch applied the 
cassandra-cli is working again; it starts up without throwing OOM errors.

Modified:
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
   
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
      (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/CassandraConfigurationService.java
      Fri Jan 14 13:52:01 2011
@@ -65,6 +65,21 @@
     public static final String AUTOBOOTSTRAP_MODE = "autobootstrap";
 
     /**
+     * Configuration key for the listen address
+     */
+    public static final String LISTEN_ADDRESS = "listen_address";
+
+    /**
+     * Configuration key for the RPC address
+     */
+    public static final String RPC_ADDRESS = "rpc_address";
+
+    /**
+     * Configuration key for the RPC port
+     */
+    public static final String RPC_PORT = "rpc_port";
+
+    /**
      * Configuration key that stores a list of IP addresses that are part of 
this cluster. The IP addresses
      * are stored comma separated in ConfigurationAdmin.
      */
@@ -113,4 +128,16 @@
      * @return true if auto bootstrap is enabled, false otherwise
      */
     boolean isAutoBootstrapMode();
+
+    /**
+     * Returns the RPC address to be used by Thrift clients.
+     * @return the RPC adrdess.
+     */
+    String getRPCAddress();
+
+    /**
+     * Returns the RPC port to be used by Thrift clients.
+     * @return the RPC port.
+     */
+    int getRPCPort();
 }

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
  (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraConfigurationServiceImpl.java
  Fri Jan 14 13:52:01 2011
@@ -54,6 +54,8 @@
     private ConsistencyLevel m_readConsistencyLevel;
     private ConsistencyLevel m_writeConsistencyLevel;
     private boolean m_bootstrapMode;
+    private String m_rpcAddress;
+    private int m_rpcPort;
 
     /**
      * The init() method is invoked by the Felix dependency manager. It allows 
us to initialize our service. In this
@@ -111,6 +113,11 @@
             m_readConsistencyLevel = 
ConsistencyLevel.valueOf(dictionary.get(READ_CONSISTENCY_LEVEL).toString());
             m_writeConsistencyLevel = 
ConsistencyLevel.valueOf(dictionary.get(WRITE_CONSISTENCY_LEVEL).toString());
             m_bootstrapMode = 
dictionary.get(AUTOBOOTSTRAP_MODE).toString().equalsIgnoreCase("true");
+            m_rpcAddress = dictionary.get(RPC_ADDRESS).toString();
+            if (m_rpcAddress.isEmpty()) {
+                m_rpcAddress = dictionary.get(LISTEN_ADDRESS).toString();
+            }
+            m_rpcPort = Integer.parseInt(dictionary.get(RPC_PORT).toString());
         }
     }
 
@@ -147,4 +154,12 @@
     public boolean isAutoBootstrapMode() {
         return m_bootstrapMode;
     }
+
+    public String getRPCAddress() {
+        return m_rpcAddress;
+    }
+
+    public int getRPCPort() {
+        return m_rpcPort;
+    }
 }

Modified: 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
 (original)
+++ 
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
 Fri Jan 14 13:52:01 2011
@@ -23,14 +23,21 @@
 
 import org.amdatu.cassandra.application.CassandraConfigurationService;
 import org.amdatu.cassandra.application.CassandraDaemonService;
-import org.apache.cassandra.avro.CassandraDaemon;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CassandraDaemon;
 import org.apache.cassandra.thrift.CassandraServer;
 import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.KsDef;
 import org.apache.cassandra.thrift.NotFoundException;
 import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
 import org.osgi.service.log.LogService;
@@ -44,13 +51,6 @@
     // The default placement strategy
     private final String DEFAULT_PLACEMENT_STRATEGY = 
"org.apache.cassandra.locator.SimpleStrategy";
 
-    // The amount of milliseconds to wait after starting the Cassandra daemon. 
This gives the Cassandra the
-    // time to synchronize its data with other nodes in the cluster and/or 
perform any local migrations.
-    // If we wouldn't wait, this node will start adding keyspaces and 
columnfamilies while it is also
-    // synchronizing the same keyspaces and columnfamily's with other nodes in 
the cluster. This significantly
-    // increases the risk of synchronization errors and so we just wait for 
this amount of time before continuing.
-    private final static int CASSANDRA_HALT_TIMEOUT = 5000;
-
     // Service dependencies, injected by the framework
     private volatile LogService m_logService;
     private volatile EventAdmin m_eventAdmin;
@@ -88,22 +88,25 @@
         if (m_daemonHasShutdown) {
             throw new RuntimeException("CassandraDaemon has already been 
shutdown and cannot be restarted.");
         }
-        m_daemon.activate();
-        m_cassandraServer = new CassandraServer();
 
+        // Activate the daemon from a separate thread, as the activate() 
method never returns
+        new CassandraDaemonActivateThread().start();
         try {
-            // Wait CASSANDRA_HALT_TIMEOUT seconds. This allows this Cassandra 
node to synchronize its data
-            // with other nodes in the cluster, before we start initializing 
our bundles which potentially
-            // try to add keyspaces and columnfamilies
-            Thread.sleep(CASSANDRA_HALT_TIMEOUT);
-        }
-        catch (InterruptedException e) {
+            // Now establish a Thrift connection to the daemon, the connection 
is established as soon
+            // as the daemon is running.
+            testThriftConnection();
+
+            // Create the cassandra server
+            m_cassandraServer = new CassandraServer();
+
+            m_logService.log(LogService.LOG_INFO, "Cassandra Daemon started 
with configuration: ");
+            m_logService.log(LogService.LOG_INFO, "> Default replication 
factor: " + m_configuration.getDefaultReplicationFactor());
+            m_logService.log(LogService.LOG_INFO, "> Read consistency level: " 
+ m_configuration.getReadConsistencyLevel());
+            m_logService.log(LogService.LOG_INFO, "> Write consistency level: 
" + m_configuration.getWriteConsistencyLevel());
+        }
+        catch (TTransportException e) {
+            m_logService.log(LogService.LOG_INFO, "Could not establish a 
Thrift connection to the Cassandra Daemon", e);
         }
-
-        m_logService.log(LogService.LOG_INFO, "Cassandra Daemon started with 
configuration: ");
-        m_logService.log(LogService.LOG_INFO, "> Default replication factor: " 
+ m_configuration.getDefaultReplicationFactor());
-        m_logService.log(LogService.LOG_INFO, "> Read consistency level: " + 
m_configuration.getReadConsistencyLevel());
-        m_logService.log(LogService.LOG_INFO, "> Write consistency level: " + 
m_configuration.getWriteConsistencyLevel());
     }
 
     public void stop() {
@@ -176,6 +179,11 @@
     }
 
     public synchronized void setReplicationFactor(int replicationFactor) 
throws InvalidRequestException, TException {
+        int clusterSize = StorageService.instance.getLiveNodes().size();
+        if (replicationFactor > clusterSize) {
+            throw new InvalidRequestException("Replication factor " + 
replicationFactor + " may never exceed the amount of nodes " +
+                "in the cluster, which currently is " + clusterSize);
+        }
         List<KsDef> keyspaces = m_cassandraServer.describe_keyspaces();
         for (KsDef keyspace : keyspaces) {
             if (keyspace.getReplication_factor() != replicationFactor) {
@@ -270,4 +278,26 @@
         }
         return false;
     }
+
+    class CassandraDaemonActivateThread extends Thread {
+        public void run() {
+            m_daemon.activate();
+        }
+    }
+
+    /**
+     * This methods opens a Thrift connection to the Cassandra daemon and 
returns if the connection
+     * has been established. This is usefull to ensure that the daemon is 
running before continuing.
+     * @throws TTransportException
+     */
+    private void testThriftConnection() throws TTransportException {
+        String thrift = m_configuration.getRPCAddress() + ":" + 
m_configuration.getRPCPort();
+        m_logService.log(LogService.LOG_INFO, "Establishing Thrift connection 
to the Cassandra Daemon on " + thrift);
+        TTransport tr = new TSocket(m_configuration.getRPCAddress(), 
m_configuration.getRPCPort());
+        TProtocol proto = new TBinaryProtocol(tr);
+        new Cassandra.Client(proto);
+        tr.open();
+        m_logService.log(LogService.LOG_INFO, "Thrift connection established 
successfully");
+    }
+
 }

Reply via email to