Author: [email protected]
Date: Mon Jun 11 15:30:22 2012
New Revision: 2458

Log:
[AMDATUCASSANDRA-191] Fixed Cassandra processes being launched simultaneously

Added:
   
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/CassandraProcessLifecycleListener.java
   
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessLifecycleListenerImpl.java
Modified:
   
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/osgi/Activator.java
   
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraConfiguratorImpl.java
   
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraLauncherImpl.java
   
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraManagerImpl.java
   
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessStreamListenerImpl.java
   
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/api/CassandraLauncher.java

Added: 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/CassandraProcessLifecycleListener.java
==============================================================================
--- (empty file)
+++ 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/CassandraProcessLifecycleListener.java
        Mon Jun 11 15:30:22 2012
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2010-2012 The Amdatu Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.cassandra.launcher;
+
+import org.apache.ace.processlauncher.ProcessLifecycleListener;
+
+/**
+ * The Cassandra process lifecycle listener listens.
+ * 
+ * @author <a href="mailto:[email protected]";>Amdatu Project 
Team</a>
+ */
+public interface CassandraProcessLifecycleListener extends 
ProcessLifecycleListener {
+    /**
+     * Returns if the (only 1 process can run at a time) Cassandra process is 
running.
+     * 
+     * @return true if the Cassandra process is running, false otherwise.
+     */
+    boolean isRunning();
+}

Modified: 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/osgi/Activator.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/osgi/Activator.java
   (original)
+++ 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/osgi/Activator.java
   Mon Jun 11 15:30:22 2012
@@ -15,16 +15,19 @@
  */
 package org.amdatu.cassandra.launcher.osgi;
 
+import org.amdatu.cassandra.launcher.CassandraProcessLifecycleListener;
 import org.amdatu.cassandra.launcher.CassandraProcessStreamListener;
 import org.amdatu.cassandra.launcher.service.CassandraConfiguratorImpl;
 import org.amdatu.cassandra.launcher.service.CassandraInstallerImpl;
 import org.amdatu.cassandra.launcher.service.CassandraLauncherImpl;
 import org.amdatu.cassandra.launcher.service.CassandraManagerImpl;
+import org.amdatu.cassandra.launcher.service.ProcessLifecycleListenerImpl;
 import org.amdatu.cassandra.launcher.service.ProcessStreamListenerImpl;
 import org.amdatu.cassandra.launcher.service.api.CassandraConfigurator;
 import org.amdatu.cassandra.launcher.service.api.CassandraInstaller;
 import org.amdatu.cassandra.launcher.service.api.CassandraLauncher;
 import org.amdatu.cassandra.launcher.service.api.CassandraManager;
+import org.apache.ace.processlauncher.ProcessLifecycleListener;
 import org.apache.ace.processlauncher.ProcessStreamListener;
 import org.apache.felix.dm.DependencyActivatorBase;
 import org.apache.felix.dm.DependencyManager;
@@ -81,16 +84,28 @@
                 .setInterface(CassandraManager.class.getName(), null)
                 
.add(createServiceDependency().setService(LogService.class).setRequired(true))
                 
.add(createServiceDependency().setService(CassandraLauncher.class).setRequired(true))
-                 
.add(createServiceDependency().setService(CassandraConfigurator.class).setRequired(true))
+                
.add(createServiceDependency().setService(CassandraConfigurator.class).setRequired(true))
+                
.add(createServiceDependency().setService(ConfigurationAdmin.class).setRequired(true))
+                
.add(createServiceDependency().setService(CassandraProcessLifecycleListener.class).setRequired(true))
                 
.add(createConfigurationDependency().setPid(CassandraManager.PID)));
 
-        // Register the Cassandra launcher service
+        // Register the Cassandra process stream listener service
         manager.add(
             createComponent()
                 .setImplementation(ProcessStreamListenerImpl.class)
                 .setInterface(
-                    new String[] {ProcessStreamListener.class.getName(),
-                        CassandraProcessStreamListener.class.getName() }, 
null));
+                    new String[] { ProcessStreamListener.class.getName(),
+                        CassandraProcessStreamListener.class.getName() }, null)
+                
.add(createServiceDependency().setService(LogService.class).setRequired(true)));
+
+        // Register the Cassandra process lifecyle listener service
+        manager.add(
+            createComponent()
+                .setImplementation(ProcessLifecycleListenerImpl.class)
+                .setInterface(
+                    new String[] { ProcessLifecycleListener.class.getName(),
+                        CassandraProcessLifecycleListener.class.getName() }, 
null)
+                
.add(createServiceDependency().setService(LogService.class).setRequired(true)));
 
     }
 

Modified: 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraConfiguratorImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraConfiguratorImpl.java
        (original)
+++ 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraConfiguratorImpl.java
        Mon Jun 11 15:30:22 2012
@@ -229,9 +229,9 @@
         }
 
         // Overrule dirs to append workdir (one directory up)
-        context.put(COMMITLOG_DIR, 
getTargetFile(COMMITLOG_DIR).toString().replace("\\", "/"));
-        context.put(DATAFILE_DIR, 
getTargetFile(DATAFILE_DIR).toString().replace("\\", "/"));
-        context.put(SAVEDCACHES_DIR, 
getTargetFile(SAVEDCACHES_DIR).toString().replace("\\", "/"));
+        context.put(COMMITLOG_DIR, 
getTargetFile(COMMITLOG_DIR).getAbsolutePath().replace("\\", "/"));
+        context.put(DATAFILE_DIR, 
getTargetFile(DATAFILE_DIR).getAbsolutePath().replace("\\", "/"));
+        context.put(SAVEDCACHES_DIR, 
getTargetFile(SAVEDCACHES_DIR).getAbsolutePath().replace("\\", "/"));
 
         // Let the velocity processor replace the config entries and write the 
result
         processConfigFile(STORAGE_CONF_SOURCE, targetFile, context);
@@ -245,7 +245,7 @@
         // Ensure the aprent dir exists
         systemLog.getParentFile().mkdirs();
 
-        context.put(SYSTEMLOG_FILE, systemLog.toString().replace("\\", "/"));
+        context.put(SYSTEMLOG_FILE, systemLog.getAbsolutePath().replace("\\", 
"/"));
 
         // Set system_log dir property
         processConfigFile(LOG4J_SERVER_PROPERTIES, targetFile, context);

Modified: 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraLauncherImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraLauncherImpl.java
    (original)
+++ 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraLauncherImpl.java
    Mon Jun 11 15:30:22 2012
@@ -21,6 +21,7 @@
 import java.util.Dictionary;
 import java.util.Hashtable;
 
+import org.amdatu.cassandra.launcher.CassandraProcessLifecycleListener;
 import org.amdatu.cassandra.launcher.CassandraProcessStreamListener;
 import org.amdatu.cassandra.launcher.service.api.CassandraConfigurator;
 import org.amdatu.cassandra.launcher.service.api.CassandraInstaller;
@@ -43,12 +44,11 @@
     private volatile CassandraConfigurator m_configurator;
     private volatile CassandraInstaller m_installer;
 
-    private Configuration m_config;
-    private boolean m_isRunning = false;
+    private String m_configPid = null;
 
     @SuppressWarnings("rawtypes")
     public synchronized boolean startCassandra() {
-        if (m_isRunning) {
+        if (m_configPid != null) {
             m_logService.log(LogService.LOG_WARNING, "Cannot launch Cassandra, 
it is already running.");
             return false;
         }
@@ -71,6 +71,10 @@
 
         String streamFilter = "(objectClass=" + 
CassandraProcessStreamListener.class.getCanonicalName() + ")";
         properties.put("executable.processStreamListener", streamFilter);
+        
+        String lifecycleFilter = "(objectClass=" + 
CassandraProcessLifecycleListener.class.getCanonicalName() + ")";
+        properties.put("executable.processLifecycleListener", lifecycleFilter);
+        
         properties.put("executable.args", "");
 
         // Step 2b. Deploy the config to Config Admin
@@ -78,10 +82,9 @@
             m_logService.log(LogService.LOG_INFO, "Launching Cassandra...");
             m_logService.log(LogService.LOG_DEBUG, "Executable command:");
             m_logService.log(LogService.LOG_DEBUG, exe);
-            m_config = 
m_configAdmin.createFactoryConfiguration(ProcessLauncherService.PID, null);
-            m_config.update(properties);
-            m_logService.log(LogService.LOG_INFO, "Cassandra launched 
successfully.");
-            m_isRunning = true;
+            Configuration config = 
m_configAdmin.createFactoryConfiguration(ProcessLauncherService.PID, null);
+            config.update(properties);
+            m_configPid = config.getPid();
             return true;
         }
         catch (IOException e) {
@@ -93,26 +96,22 @@
 
     public synchronized void stopCassandra() {
         m_logService.log(LogService.LOG_INFO, "Stopping Cassandra 
instance...");
-        if (m_config != null) {
+        if (m_configPid != null) {
             try {
-                // Upon stopping this bundle, delete the config file. The ACE 
process launcher
-                // will notify this deletion and stop the external Cassandra 
process.
-                m_config.delete();
-                m_config = null;
-                m_isRunning = false;
-                m_logService.log(LogService.LOG_INFO, "Cassandra stopped 
successfully.");
+                Configuration config = 
m_configAdmin.getConfiguration(m_configPid, null);
+                if (config != null) {
+                    // Upon stopping this bundle, delete the config file. The 
ACE process launcher
+                    // will notify this deletion and stop the external 
Cassandra process.
+                    config.delete();
+                }
+                m_configPid = null;
             }
             catch (IOException e) {
-                m_logService.log(LogService.LOG_ERROR, "Could not stop 
Cassandra", e);
+                m_logService.log(LogService.LOG_ERROR, "An error occured while 
stopping Cassandra", e);
             }
         }
     }
 
-    @Override
-    public synchronized boolean isRunning() {
-        return m_isRunning;
-    }
-
     private String buildExecutable(String cassandraHome) {
         String javaOpts = m_configurator.getJavaOpts();
         String systemProperties = m_configurator.getSystemProperties();

Modified: 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraManagerImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraManagerImpl.java
     (original)
+++ 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraManagerImpl.java
     Mon Jun 11 15:30:22 2012
@@ -15,8 +15,11 @@
  */
 package org.amdatu.cassandra.launcher.service;
 
+import java.util.ArrayList;
 import java.util.Dictionary;
+import java.util.List;
 
+import org.amdatu.cassandra.launcher.CassandraProcessLifecycleListener;
 import org.amdatu.cassandra.launcher.service.api.CassandraConfigurator;
 import org.amdatu.cassandra.launcher.service.api.CassandraLauncher;
 import org.amdatu.cassandra.launcher.service.api.CassandraManager;
@@ -28,45 +31,85 @@
  * 
  * @author <a href="mailto:[email protected]";>Amdatu Project 
Team</a>
  */
+@SuppressWarnings("rawtypes")
 public class CassandraManagerImpl implements CassandraManager, ManagedService {
+    // Interval for the update thread
+    private static final int INTERVAL = 1000;
+
     // Service dependencies
     private volatile CassandraConfigurator m_configurator;
     private volatile CassandraLauncher m_launcher;
+    private volatile CassandraProcessLifecycleListener m_processListener;
+
+    private QueueHandlerThread m_queueHandler;
 
-    @SuppressWarnings("rawtypes")
-    private Dictionary m_properties;
+    private List<Dictionary> m_queue = new ArrayList<Dictionary>();
 
     public synchronized void start() {
-        update();
+        // Now start the inspect thread
+        m_queueHandler = new QueueHandlerThread();
+        m_queueHandler.start();
     }
 
-    public synchronized void stop() {
-        if (m_launcher.isRunning()) {
-            m_launcher.stopCassandra();
-        }
+    public synchronized void stop() throws InterruptedException {
+        m_queueHandler.interrupt();
+        m_queueHandler.join();
     }
 
-    @SuppressWarnings("rawtypes")
     public synchronized void updated(final Dictionary dictionary) throws 
ConfigurationException {
         if (dictionary != null) {
-            m_properties = dictionary;
-            if (m_launcher != null) {
-                // This is a config update
-                update();
+            synchronized (m_queue) {
+                m_queue.add(dictionary);
             }
         }
     }
 
-    private void update() {
-        // Update the configurator with the new config
-        m_configurator.updateConfiguration(m_properties);
-
-        // Check if Cassandra is running, if so, stop it
-        if (m_launcher.isRunning()) {
-            m_launcher.stopCassandra();
+    class QueueHandlerThread extends Thread {
+        // Interruption parameter
+        private boolean m_interrupted = false;
+
+        public void run() {
+            while (!m_interrupted) {
+                try {
+                    Thread.sleep(INTERVAL);
+
+                    // Check the queue
+                    if (!m_queue.isEmpty()) {
+                        // First check if a Cassandra process is currently 
running. If so, stop
+                        // it and wait for it to end.
+                        if (m_processListener.isRunning()) {
+                            m_launcher.stopCassandra();
+                            while (!m_interrupted && 
m_processListener.isRunning()) {
+                                Thread.sleep(INTERVAL);
+                            }
+                        }
+
+                        // Get the next config from the queue
+                        Dictionary properties;
+                        synchronized (m_queue) {
+                            properties =  m_queue.get(0);
+                            m_queue.remove(0);
+                        }
+                        
+                        // Update the configurator with the config
+                        m_configurator.updateConfiguration(properties);
+
+                        // Start Cassandra
+                        if (m_launcher.startCassandra()) {
+                            // Wait for the process to be launched
+                            while (!m_interrupted && 
!m_processListener.isRunning()) {
+                                Thread.sleep(INTERVAL);
+                            }
+                        }
+                    }
+                }
+                catch (InterruptedException e) {
+                }
+            }
         }
 
-        // Start Cassandra
-        m_launcher.startCassandra();
+        public void interrupt() {
+            m_interrupted = true;
+        }
     }
 }
\ No newline at end of file

Added: 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessLifecycleListenerImpl.java
==============================================================================
--- (empty file)
+++ 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessLifecycleListenerImpl.java
     Mon Jun 11 15:30:22 2012
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2010-2012 The Amdatu Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.cassandra.launcher.service;
+
+import java.util.Properties;
+
+import org.amdatu.cassandra.launcher.CassandraProcessLifecycleListener;
+import org.apache.ace.processlauncher.LaunchConfiguration;
+import org.osgi.service.log.LogService;
+
+public class ProcessLifecycleListenerImpl implements 
CassandraProcessLifecycleListener {
+    // Injected service dependencies
+    private volatile LogService m_logService;
+
+    // Remembers if Cassandra is currently running
+    private boolean m_isRunning = false;
+
+    public Properties beforeProcessStart(LaunchConfiguration configuration) {
+        m_logService.log(LogService.LOG_INFO, "Launching Cassandra 
process...");
+        m_isRunning = true;
+        return null;
+    }
+
+    public void afterProcessEnd(LaunchConfiguration configuration) {
+        m_logService.log(LogService.LOG_INFO, "Cassandra process stopped.");
+        m_isRunning = false;
+    }
+
+    public boolean isRunning() {
+        return m_isRunning;
+    }
+}

Modified: 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessStreamListenerImpl.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessStreamListenerImpl.java
        (original)
+++ 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessStreamListenerImpl.java
        Mon Jun 11 15:30:22 2012
@@ -15,12 +15,11 @@
  */
 package org.amdatu.cassandra.launcher.service;
 
-import org.amdatu.cassandra.launcher.CassandraProcessStreamListener;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.amdatu.cassandra.launcher.CassandraProcessStreamListener;
 import org.apache.ace.processlauncher.LaunchConfiguration;
 import org.apache.commons.compress.utils.IOUtils;
 
@@ -30,6 +29,7 @@
  * @author <a href="mailto:[email protected]";>Amdatu Project 
Team</a>
  */
 public class ProcessStreamListenerImpl implements 
CassandraProcessStreamListener {
+    // The input stream to dispatch to
     private InputStream m_inputStream = null;
 
     public void setStdin(LaunchConfiguration launchConfiguration, OutputStream 
outputStream) {
@@ -49,9 +49,6 @@
     }
 
     private void dispatchInput() {
-        // FIXME: output should be dispatched to LogService instead of 
System.out but that will
-        // be difficult since this class is instantiated by the launcher. For 
now we dispatch simply
-        // to System.out
         new Thread(new Runnable() {
             public void run() {
                 try {

Modified: 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/api/CassandraLauncher.java
==============================================================================
--- 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/api/CassandraLauncher.java
    (original)
+++ 
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/api/CassandraLauncher.java
    Mon Jun 11 15:30:22 2012
@@ -30,13 +30,6 @@
     boolean startCassandra();
 
     /**
-     * Returns if Cassandra is currently running.
-     * 
-     * @return true if Cassandra is currently running, false otherwise.
-     */
-    boolean isRunning();
-
-    /**
      * Stops the external Cassandra process.
      */
     void stopCassandra();
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits

Reply via email to