Author: [email protected]
Date: Mon Jun 11 15:30:22 2012
New Revision: 2458
Log:
[AMDATUCASSANDRA-191] Fixed Cassandra processes being launched simultaneously
Added:
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/CassandraProcessLifecycleListener.java
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessLifecycleListenerImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/osgi/Activator.java
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraConfiguratorImpl.java
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraLauncherImpl.java
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraManagerImpl.java
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessStreamListenerImpl.java
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/api/CassandraLauncher.java
Added:
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/CassandraProcessLifecycleListener.java
==============================================================================
--- (empty file)
+++
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/CassandraProcessLifecycleListener.java
Mon Jun 11 15:30:22 2012
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2010-2012 The Amdatu Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.cassandra.launcher;
+
+import org.apache.ace.processlauncher.ProcessLifecycleListener;
+
+/**
+ * The Cassandra process lifecycle listener listens.
+ *
+ * @author <a href="mailto:[email protected]">Amdatu Project
Team</a>
+ */
+public interface CassandraProcessLifecycleListener extends
ProcessLifecycleListener {
+ /**
+ * Returns if the (only 1 process can run at a time) Cassandra process is
running.
+ *
+ * @return true if the Cassandra process is running, false otherwise.
+ */
+ boolean isRunning();
+}
Modified:
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/osgi/Activator.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/osgi/Activator.java
(original)
+++
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/osgi/Activator.java
Mon Jun 11 15:30:22 2012
@@ -15,16 +15,19 @@
*/
package org.amdatu.cassandra.launcher.osgi;
+import org.amdatu.cassandra.launcher.CassandraProcessLifecycleListener;
import org.amdatu.cassandra.launcher.CassandraProcessStreamListener;
import org.amdatu.cassandra.launcher.service.CassandraConfiguratorImpl;
import org.amdatu.cassandra.launcher.service.CassandraInstallerImpl;
import org.amdatu.cassandra.launcher.service.CassandraLauncherImpl;
import org.amdatu.cassandra.launcher.service.CassandraManagerImpl;
+import org.amdatu.cassandra.launcher.service.ProcessLifecycleListenerImpl;
import org.amdatu.cassandra.launcher.service.ProcessStreamListenerImpl;
import org.amdatu.cassandra.launcher.service.api.CassandraConfigurator;
import org.amdatu.cassandra.launcher.service.api.CassandraInstaller;
import org.amdatu.cassandra.launcher.service.api.CassandraLauncher;
import org.amdatu.cassandra.launcher.service.api.CassandraManager;
+import org.apache.ace.processlauncher.ProcessLifecycleListener;
import org.apache.ace.processlauncher.ProcessStreamListener;
import org.apache.felix.dm.DependencyActivatorBase;
import org.apache.felix.dm.DependencyManager;
@@ -81,16 +84,28 @@
.setInterface(CassandraManager.class.getName(), null)
.add(createServiceDependency().setService(LogService.class).setRequired(true))
.add(createServiceDependency().setService(CassandraLauncher.class).setRequired(true))
-
.add(createServiceDependency().setService(CassandraConfigurator.class).setRequired(true))
+
.add(createServiceDependency().setService(CassandraConfigurator.class).setRequired(true))
+
.add(createServiceDependency().setService(ConfigurationAdmin.class).setRequired(true))
+
.add(createServiceDependency().setService(CassandraProcessLifecycleListener.class).setRequired(true))
.add(createConfigurationDependency().setPid(CassandraManager.PID)));
- // Register the Cassandra launcher service
+ // Register the Cassandra process stream listener service
manager.add(
createComponent()
.setImplementation(ProcessStreamListenerImpl.class)
.setInterface(
- new String[] {ProcessStreamListener.class.getName(),
- CassandraProcessStreamListener.class.getName() },
null));
+ new String[] { ProcessStreamListener.class.getName(),
+ CassandraProcessStreamListener.class.getName() }, null)
+
.add(createServiceDependency().setService(LogService.class).setRequired(true)));
+
+ // Register the Cassandra process lifecyle listener service
+ manager.add(
+ createComponent()
+ .setImplementation(ProcessLifecycleListenerImpl.class)
+ .setInterface(
+ new String[] { ProcessLifecycleListener.class.getName(),
+ CassandraProcessLifecycleListener.class.getName() },
null)
+
.add(createServiceDependency().setService(LogService.class).setRequired(true)));
}
Modified:
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraConfiguratorImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraConfiguratorImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraConfiguratorImpl.java
Mon Jun 11 15:30:22 2012
@@ -229,9 +229,9 @@
}
// Overrule dirs to append workdir (one directory up)
- context.put(COMMITLOG_DIR,
getTargetFile(COMMITLOG_DIR).toString().replace("\\", "/"));
- context.put(DATAFILE_DIR,
getTargetFile(DATAFILE_DIR).toString().replace("\\", "/"));
- context.put(SAVEDCACHES_DIR,
getTargetFile(SAVEDCACHES_DIR).toString().replace("\\", "/"));
+ context.put(COMMITLOG_DIR,
getTargetFile(COMMITLOG_DIR).getAbsolutePath().replace("\\", "/"));
+ context.put(DATAFILE_DIR,
getTargetFile(DATAFILE_DIR).getAbsolutePath().replace("\\", "/"));
+ context.put(SAVEDCACHES_DIR,
getTargetFile(SAVEDCACHES_DIR).getAbsolutePath().replace("\\", "/"));
// Let the velocity processor replace the config entries and write the
result
processConfigFile(STORAGE_CONF_SOURCE, targetFile, context);
@@ -245,7 +245,7 @@
// Ensure the aprent dir exists
systemLog.getParentFile().mkdirs();
- context.put(SYSTEMLOG_FILE, systemLog.toString().replace("\\", "/"));
+ context.put(SYSTEMLOG_FILE, systemLog.getAbsolutePath().replace("\\",
"/"));
// Set system_log dir property
processConfigFile(LOG4J_SERVER_PROPERTIES, targetFile, context);
Modified:
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraLauncherImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraLauncherImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraLauncherImpl.java
Mon Jun 11 15:30:22 2012
@@ -21,6 +21,7 @@
import java.util.Dictionary;
import java.util.Hashtable;
+import org.amdatu.cassandra.launcher.CassandraProcessLifecycleListener;
import org.amdatu.cassandra.launcher.CassandraProcessStreamListener;
import org.amdatu.cassandra.launcher.service.api.CassandraConfigurator;
import org.amdatu.cassandra.launcher.service.api.CassandraInstaller;
@@ -43,12 +44,11 @@
private volatile CassandraConfigurator m_configurator;
private volatile CassandraInstaller m_installer;
- private Configuration m_config;
- private boolean m_isRunning = false;
+ private String m_configPid = null;
@SuppressWarnings("rawtypes")
public synchronized boolean startCassandra() {
- if (m_isRunning) {
+ if (m_configPid != null) {
m_logService.log(LogService.LOG_WARNING, "Cannot launch Cassandra,
it is already running.");
return false;
}
@@ -71,6 +71,10 @@
String streamFilter = "(objectClass=" +
CassandraProcessStreamListener.class.getCanonicalName() + ")";
properties.put("executable.processStreamListener", streamFilter);
+
+ String lifecycleFilter = "(objectClass=" +
CassandraProcessLifecycleListener.class.getCanonicalName() + ")";
+ properties.put("executable.processLifecycleListener", lifecycleFilter);
+
properties.put("executable.args", "");
// Step 2b. Deploy the config to Config Admin
@@ -78,10 +82,9 @@
m_logService.log(LogService.LOG_INFO, "Launching Cassandra...");
m_logService.log(LogService.LOG_DEBUG, "Executable command:");
m_logService.log(LogService.LOG_DEBUG, exe);
- m_config =
m_configAdmin.createFactoryConfiguration(ProcessLauncherService.PID, null);
- m_config.update(properties);
- m_logService.log(LogService.LOG_INFO, "Cassandra launched
successfully.");
- m_isRunning = true;
+ Configuration config =
m_configAdmin.createFactoryConfiguration(ProcessLauncherService.PID, null);
+ config.update(properties);
+ m_configPid = config.getPid();
return true;
}
catch (IOException e) {
@@ -93,26 +96,22 @@
public synchronized void stopCassandra() {
m_logService.log(LogService.LOG_INFO, "Stopping Cassandra
instance...");
- if (m_config != null) {
+ if (m_configPid != null) {
try {
- // Upon stopping this bundle, delete the config file. The ACE
process launcher
- // will notify this deletion and stop the external Cassandra
process.
- m_config.delete();
- m_config = null;
- m_isRunning = false;
- m_logService.log(LogService.LOG_INFO, "Cassandra stopped
successfully.");
+ Configuration config =
m_configAdmin.getConfiguration(m_configPid, null);
+ if (config != null) {
+ // Upon stopping this bundle, delete the config file. The
ACE process launcher
+ // will notify this deletion and stop the external
Cassandra process.
+ config.delete();
+ }
+ m_configPid = null;
}
catch (IOException e) {
- m_logService.log(LogService.LOG_ERROR, "Could not stop
Cassandra", e);
+ m_logService.log(LogService.LOG_ERROR, "An error occured while
stopping Cassandra", e);
}
}
}
- @Override
- public synchronized boolean isRunning() {
- return m_isRunning;
- }
-
private String buildExecutable(String cassandraHome) {
String javaOpts = m_configurator.getJavaOpts();
String systemProperties = m_configurator.getSystemProperties();
Modified:
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraManagerImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraManagerImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/CassandraManagerImpl.java
Mon Jun 11 15:30:22 2012
@@ -15,8 +15,11 @@
*/
package org.amdatu.cassandra.launcher.service;
+import java.util.ArrayList;
import java.util.Dictionary;
+import java.util.List;
+import org.amdatu.cassandra.launcher.CassandraProcessLifecycleListener;
import org.amdatu.cassandra.launcher.service.api.CassandraConfigurator;
import org.amdatu.cassandra.launcher.service.api.CassandraLauncher;
import org.amdatu.cassandra.launcher.service.api.CassandraManager;
@@ -28,45 +31,85 @@
*
* @author <a href="mailto:[email protected]">Amdatu Project
Team</a>
*/
+@SuppressWarnings("rawtypes")
public class CassandraManagerImpl implements CassandraManager, ManagedService {
+ // Interval for the update thread
+ private static final int INTERVAL = 1000;
+
// Service dependencies
private volatile CassandraConfigurator m_configurator;
private volatile CassandraLauncher m_launcher;
+ private volatile CassandraProcessLifecycleListener m_processListener;
+
+ private QueueHandlerThread m_queueHandler;
- @SuppressWarnings("rawtypes")
- private Dictionary m_properties;
+ private List<Dictionary> m_queue = new ArrayList<Dictionary>();
public synchronized void start() {
- update();
+ // Now start the inspect thread
+ m_queueHandler = new QueueHandlerThread();
+ m_queueHandler.start();
}
- public synchronized void stop() {
- if (m_launcher.isRunning()) {
- m_launcher.stopCassandra();
- }
+ public synchronized void stop() throws InterruptedException {
+ m_queueHandler.interrupt();
+ m_queueHandler.join();
}
- @SuppressWarnings("rawtypes")
public synchronized void updated(final Dictionary dictionary) throws
ConfigurationException {
if (dictionary != null) {
- m_properties = dictionary;
- if (m_launcher != null) {
- // This is a config update
- update();
+ synchronized (m_queue) {
+ m_queue.add(dictionary);
}
}
}
- private void update() {
- // Update the configurator with the new config
- m_configurator.updateConfiguration(m_properties);
-
- // Check if Cassandra is running, if so, stop it
- if (m_launcher.isRunning()) {
- m_launcher.stopCassandra();
+ class QueueHandlerThread extends Thread {
+ // Interruption parameter
+ private boolean m_interrupted = false;
+
+ public void run() {
+ while (!m_interrupted) {
+ try {
+ Thread.sleep(INTERVAL);
+
+ // Check the queue
+ if (!m_queue.isEmpty()) {
+ // First check if a Cassandra process is currently
running. If so, stop
+ // it and wait for it to end.
+ if (m_processListener.isRunning()) {
+ m_launcher.stopCassandra();
+ while (!m_interrupted &&
m_processListener.isRunning()) {
+ Thread.sleep(INTERVAL);
+ }
+ }
+
+ // Get the next config from the queue
+ Dictionary properties;
+ synchronized (m_queue) {
+ properties = m_queue.get(0);
+ m_queue.remove(0);
+ }
+
+ // Update the configurator with the config
+ m_configurator.updateConfiguration(properties);
+
+ // Start Cassandra
+ if (m_launcher.startCassandra()) {
+ // Wait for the process to be launched
+ while (!m_interrupted &&
!m_processListener.isRunning()) {
+ Thread.sleep(INTERVAL);
+ }
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ }
+ }
}
- // Start Cassandra
- m_launcher.startCassandra();
+ public void interrupt() {
+ m_interrupted = true;
+ }
}
}
\ No newline at end of file
Added:
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessLifecycleListenerImpl.java
==============================================================================
--- (empty file)
+++
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessLifecycleListenerImpl.java
Mon Jun 11 15:30:22 2012
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2010-2012 The Amdatu Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.amdatu.cassandra.launcher.service;
+
+import java.util.Properties;
+
+import org.amdatu.cassandra.launcher.CassandraProcessLifecycleListener;
+import org.apache.ace.processlauncher.LaunchConfiguration;
+import org.osgi.service.log.LogService;
+
+public class ProcessLifecycleListenerImpl implements
CassandraProcessLifecycleListener {
+ // Injected service dependencies
+ private volatile LogService m_logService;
+
+ // Remembers if Cassandra is currently running
+ private boolean m_isRunning = false;
+
+ public Properties beforeProcessStart(LaunchConfiguration configuration) {
+ m_logService.log(LogService.LOG_INFO, "Launching Cassandra
process...");
+ m_isRunning = true;
+ return null;
+ }
+
+ public void afterProcessEnd(LaunchConfiguration configuration) {
+ m_logService.log(LogService.LOG_INFO, "Cassandra process stopped.");
+ m_isRunning = false;
+ }
+
+ public boolean isRunning() {
+ return m_isRunning;
+ }
+}
Modified:
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessStreamListenerImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessStreamListenerImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/ProcessStreamListenerImpl.java
Mon Jun 11 15:30:22 2012
@@ -15,12 +15,11 @@
*/
package org.amdatu.cassandra.launcher.service;
-import org.amdatu.cassandra.launcher.CassandraProcessStreamListener;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.amdatu.cassandra.launcher.CassandraProcessStreamListener;
import org.apache.ace.processlauncher.LaunchConfiguration;
import org.apache.commons.compress.utils.IOUtils;
@@ -30,6 +29,7 @@
* @author <a href="mailto:[email protected]">Amdatu Project
Team</a>
*/
public class ProcessStreamListenerImpl implements
CassandraProcessStreamListener {
+ // The input stream to dispatch to
private InputStream m_inputStream = null;
public void setStdin(LaunchConfiguration launchConfiguration, OutputStream
outputStream) {
@@ -49,9 +49,6 @@
}
private void dispatchInput() {
- // FIXME: output should be dispatched to LogService instead of
System.out but that will
- // be difficult since this class is instantiated by the launcher. For
now we dispatch simply
- // to System.out
new Thread(new Runnable() {
public void run() {
try {
Modified:
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/api/CassandraLauncher.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/api/CassandraLauncher.java
(original)
+++
trunk/amdatu-cassandra/cassandra-launcher/src/main/java/org/amdatu/cassandra/launcher/service/api/CassandraLauncher.java
Mon Jun 11 15:30:22 2012
@@ -30,13 +30,6 @@
boolean startCassandra();
/**
- * Returns if Cassandra is currently running.
- *
- * @return true if Cassandra is currently running, false otherwise.
- */
- boolean isRunning();
-
- /**
* Stops the external Cassandra process.
*/
void stopCassandra();
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits