Author: [email protected]
Date: Fri Feb 17 14:57:14 2012
New Revision: 2100

Log:
[AMDATUCASSANDRA-164] Reintroduced the separate thread, as it causes a deadlock 
when client and server run in the same VM: the start method blocks until the 
daemon is available, but the same thread starts all bundles


Modified:
   
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java

Modified: 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
  (original)
+++ 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
  Fri Feb 17 14:57:14 2012
@@ -15,19 +15,14 @@
  */
 package org.amdatu.cassandra.client.service;
 
-import static 
org.amdatu.cassandra.client.service.CassandraClientConfigurationService.HOSTS;
-import static 
org.amdatu.cassandra.client.service.CassandraClientConfigurationService.PORT;
+import static 
org.amdatu.cassandra.client.service.CassandraClientConfigurationService.CLUSTER_NAME;
+import me.prettyprint.cassandra.service.ThriftCluster;
+import me.prettyprint.hector.api.Cluster;
 
 import org.amdatu.cassandra.client.CassandraDaemonAvailable;
 
-import org.apache.cassandra.thrift.Cassandra;
 import org.apache.felix.dm.Component;
 import org.apache.felix.dm.DependencyManager;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
 import org.osgi.service.log.LogService;
 
 /**
@@ -43,7 +38,7 @@
  */
 public class CassandraClientActivatorServiceImpl {
     // Timeout for the initial Thrift connection
-    private static final int THRIFT_RETRIES = 3;
+    private static final int THRIFT_RETRIES = 5;
     private static final int THRIFT_TIMEOUT = 3000;
 
     // Service dependencies injected by the dependency manager
@@ -51,77 +46,69 @@
     private volatile LogService m_logService;
     private volatile CassandraClientConfigurationService m_configuration;
 
+    private static CassandraDaemonListenerThread m_activatorThread = null;
     private Component m_serviceAvailableComponent = null;
 
-    public synchronized void start() {
-        // Test the Thrift connection and register a 
CassandraDaemonAvailableService
-        // as soon as a Thrift connection could be established
-        registerAvailableServiec();
-    }
-
-    public synchronized void stop() {
-        // Unregister the CassandraDaemonAvailable service component
-        if (m_serviceAvailableComponent != null) {
-            m_dependencyManager.remove(m_serviceAvailableComponent);
+    public void start() {
+        // We start the activator thread only once during the lifetime of this 
bundle classloader.
+        // It is only intended to hold the system until the Cassandra host is 
available, this may
+        // take some time in case the Cassandra server runs inside this very 
same container.
+        if (m_activatorThread == null) {
+            // Start a new daemon listener thread. It will register a 
CassandraDaemonAvailableService
+            // as soon as Hector can successfully create a connection to the 
Cassandra server
+            m_activatorThread = new CassandraDaemonListenerThread();
+            m_activatorThread.start();
         }
     }
 
-    private void registerAvailableServiec() {
-        try {
+    class CassandraDaemonListenerThread extends Thread {
+        public void run() {
             // First wait until we can establish a Thrift connection to the 
daemon, the connection is
             // established as soon as the daemon is running.
-            testThriftConnection();
+            if (waitForConnection()) {
 
-            // Register a new CassandraDaemonAvailable service
-            CassandraDaemonAvailable service = new CassandraDaemonAvailable() {
-            };
-
-            m_serviceAvailableComponent = m_dependencyManager.createComponent()
-                            
.setInterface(CassandraDaemonAvailable.class.getName(), null)
-                            .setImplementation(service);
-            m_dependencyManager.add(m_serviceAvailableComponent);
-        }
-        catch (TTransportException e) {
-            m_logService.log(LogService.LOG_ERROR, "Could not establish Thrift 
connection to Cassandra daemon, "
-                            + "daemon could not be started.");
+                // Register a new CassandraDaemonAvailable service
+                CassandraDaemonAvailable service = new 
CassandraDaemonAvailable() {
+                };
+
+                m_serviceAvailableComponent = 
m_dependencyManager.createComponent()
+                    .setInterface(CassandraDaemonAvailable.class.getName(), 
null)
+                    .setImplementation(service);
+                m_dependencyManager.add(m_serviceAvailableComponent);
+            }
         }
     }
 
-    /**
-     * This methods opens a Thrift connection to the Cassandra daemon and 
returns if the connection
-     * has been established. This is useful to ensure that the daemon is 
running before continuing.
-     * 
-     * @throws TTransportException
-     */
-    private void testThriftConnection() throws TTransportException {
+    private boolean waitForConnection() {
+        // Try THRIFT_RETRIES times to get a connection
         int retry = 0;
+        Cluster cluster = null;
+        ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
         try {
-            String thrift =
-                            m_configuration.get(HOSTS, String.class) + ":" + 
m_configuration.get(PORT, Integer.class);
-            m_logService.log(LogService.LOG_INFO, "Establishing Thrift 
connection to the Cassandra Daemon on "
-                            + thrift);
-            TTransport tr =
-                            new TSocket(m_configuration.get(HOSTS, 
String.class), m_configuration.get(PORT, Integer.class),
-                                THRIFT_TIMEOUT);
-            TProtocol proto = new TBinaryProtocol(tr);
-            new Cassandra.Client(proto);
-            tr.open();
-        }
-        catch (TTransportException e) {
-            retry++;
-            if (retry >= THRIFT_RETRIES) {
-                throw e;
-            }
-            try {
-                Thread.sleep(THRIFT_TIMEOUT);
-            }
-            catch (InterruptedException e1) {
-                m_logService.log(LogService.LOG_DEBUG, "Wait for Thrift 
connection interrupted");
+            while (retry < THRIFT_RETRIES) {
+                try {
+                    // Perform a classloader switch to prevent log4j trying to 
load classes from the system classloader
+                    // instead of the bundle classloader.
+                    
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+                    String clusterName = m_configuration.get(CLUSTER_NAME, 
String.class);
+                    cluster = new ThriftCluster(clusterName, 
m_configuration.getCassandraHostConfigurator());
+                    cluster.describeKeyspaces();
+                    m_logService.log(LogService.LOG_INFO, "Thrift connection 
established successfully");
+                    return true;
+                }
+                catch (Exception e) {
+                    retry++;
+                    Thread.sleep(THRIFT_TIMEOUT);
+                }
+                finally {
+                    cluster.getConnectionManager().shutdown();
+                }
             }
-            m_logService.log(LogService.LOG_INFO, "Thrift connection cannot 
yet be established, retrying... ("
-                            + retry + ")");
         }
-        m_logService.log(LogService.LOG_INFO, "Thrift connection established 
successfully");
+        catch (Exception e) {}
+        finally {
+            Thread.currentThread().setContextClassLoader(oldClassLoader);
+        }
+        return false;
     }
-
 }
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits

Reply via email to