Author: [email protected]
Date: Fri Feb 17 11:06:59 2012
New Revision: 2097

Log:
[AMDATUCASSANDRA-164] Changed start/stop methods to synchronized and removed 
the separate Thread (which was not really necessary). This should avoid 
concurrency issues with the DM.

Modified:
   
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java

Modified: 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
  (original)
+++ 
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
  Fri Feb 17 11:06:59 2012
@@ -15,6 +15,9 @@
  */
 package org.amdatu.cassandra.client.service;
 
+import static 
org.amdatu.cassandra.client.service.CassandraClientConfigurationService.HOSTS;
+import static 
org.amdatu.cassandra.client.service.CassandraClientConfigurationService.PORT;
+
 import org.amdatu.cassandra.client.CassandraDaemonAvailable;
 
 import org.apache.cassandra.thrift.Cassandra;
@@ -27,8 +30,6 @@
 import org.apache.thrift.transport.TTransportException;
 import org.osgi.service.log.LogService;
 
-import static 
org.amdatu.cassandra.client.service.CassandraClientConfigurationService.*;
-
 /**
  * This service is responsible for registration of a CassandraDaemonAvailable 
service in case
  * a successful Thrift connection could be made with the daemon and its 
operating mode is
@@ -50,84 +51,77 @@
     private volatile LogService m_logService;
     private volatile CassandraClientConfigurationService m_configuration;
 
-    private CassandraDaemonListenerThread m_activatorThread = null;
     private Component m_serviceAvailableComponent = null;
 
-    public void start() {
-        // Start a new daemon listener thread. It will register a 
CassandraDaemonAvailableService
-        // as soon as the daemon reached operation mode 'Normal' and a Thrift 
connection could be
-        // established
-        m_activatorThread = new CassandraDaemonListenerThread();
-        m_activatorThread.start();
+    public synchronized void start() {
+        // Test the Thrift connection and register a 
CassandraDaemonAvailableService
+        // as soon as a Thrift connection could be established
+        registerAvailableServiec();
     }
 
-    public void stop() {
-        // Interrupt the listener thread, if it is still running
-        if (m_activatorThread != null && m_activatorThread.isAlive()) {
-            m_activatorThread.interrupt();
-        }
-
-        // Unregister the CassandraDaemonAvailabkle service component
+    public synchronized void stop() {
+        // Unregister the CassandraDaemonAvailable service component
         if (m_serviceAvailableComponent != null) {
             m_dependencyManager.remove(m_serviceAvailableComponent);
         }
     }
 
-    class CassandraDaemonListenerThread extends Thread {
-        public void run() {
-            try {
-                // First wait until we can establish a Thrift connection to 
the daemon, the connection is
-                // established as soon as the daemon is running.
-                testThriftConnection();
-
-                // Register a new CassandraDaemonAvailable service
-                CassandraDaemonAvailable service = new 
CassandraDaemonAvailable() {
-                };
-
-                m_serviceAvailableComponent = 
m_dependencyManager.createComponent()
-                    .setInterface(CassandraDaemonAvailable.class.getName(), 
null)
-                    .setImplementation(service);
-                m_dependencyManager.add(m_serviceAvailableComponent);
-            }
-            catch (TTransportException e) {
-                m_logService.log(LogService.LOG_ERROR, "Could not establish 
Thrift connection to Cassandra daemon, "
-                    + "daemon could not be started.");
-            }
+    private void registerAvailableServiec() {
+        try {
+            // First wait until we can establish a Thrift connection to the 
daemon, the connection is
+            // established as soon as the daemon is running.
+            testThriftConnection();
+
+            // Register a new CassandraDaemonAvailable service
+            CassandraDaemonAvailable service = new CassandraDaemonAvailable() {
+            };
+
+            m_serviceAvailableComponent = m_dependencyManager.createComponent()
+                            
.setInterface(CassandraDaemonAvailable.class.getName(), null)
+                            .setImplementation(service);
+            m_dependencyManager.add(m_serviceAvailableComponent);
+        }
+        catch (TTransportException e) {
+            m_logService.log(LogService.LOG_ERROR, "Could not establish Thrift 
connection to Cassandra daemon, "
+                            + "daemon could not be started.");
         }
+    }
 
-        /**
-         * This methods opens a Thrift connection to the Cassandra daemon and 
returns if the connection
-         * has been established. This is useful to ensure that the daemon is 
running before continuing.
-         * 
-         * @throws TTransportException
-         */
-        private void testThriftConnection() throws TTransportException {
-            int retry = 0;
+    /**
+     * This methods opens a Thrift connection to the Cassandra daemon and 
returns if the connection
+     * has been established. This is useful to ensure that the daemon is 
running before continuing.
+     * 
+     * @throws TTransportException
+     */
+    private void testThriftConnection() throws TTransportException {
+        int retry = 0;
+        try {
+            String thrift =
+                            m_configuration.get(HOSTS, String.class) + ":" + 
m_configuration.get(PORT, Integer.class);
+            m_logService.log(LogService.LOG_INFO, "Establishing Thrift 
connection to the Cassandra Daemon on "
+                            + thrift);
+            TTransport tr =
+                            new TSocket(m_configuration.get(HOSTS, 
String.class), m_configuration.get(PORT, Integer.class),
+                                THRIFT_TIMEOUT);
+            TProtocol proto = new TBinaryProtocol(tr);
+            new Cassandra.Client(proto);
+            tr.open();
+        }
+        catch (TTransportException e) {
+            retry++;
+            if (retry >= THRIFT_RETRIES) {
+                throw e;
+            }
             try {
-                String thrift = m_configuration.get(HOSTS, String.class) + ":" 
+ m_configuration.get(PORT, Integer.class);
-                m_logService.log(LogService.LOG_INFO, "Establishing Thrift 
connection to the Cassandra Daemon on "
-                    + thrift);
-                TTransport tr = new TSocket(m_configuration.get(HOSTS, 
String.class), m_configuration.get(PORT, Integer.class),
-                    THRIFT_TIMEOUT);
-                TProtocol proto = new TBinaryProtocol(tr);
-                new Cassandra.Client(proto);
-                tr.open();
+                Thread.sleep(THRIFT_TIMEOUT);
             }
-            catch (TTransportException e) {
-                retry++;
-                if (retry >= THRIFT_RETRIES) {
-                    throw e;
-                }
-                try {
-                    Thread.sleep(THRIFT_TIMEOUT);
-                }
-                catch (InterruptedException e1) {
-                    m_logService.log(LogService.LOG_DEBUG, "Wait for Thrift 
connection interrupted");
-                }
-                m_logService.log(LogService.LOG_INFO, "Thrift connection 
cannot yet be established, retrying... ("
-                    + retry + ")");
+            catch (InterruptedException e1) {
+                m_logService.log(LogService.LOG_DEBUG, "Wait for Thrift 
connection interrupted");
             }
-            m_logService.log(LogService.LOG_INFO, "Thrift connection 
established successfully");
+            m_logService.log(LogService.LOG_INFO, "Thrift connection cannot 
yet be established, retrying... ("
+                            + retry + ")");
         }
+        m_logService.log(LogService.LOG_INFO, "Thrift connection established 
successfully");
     }
+
 }
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits

Reply via email to