Author: ivol37 at gmail.com
Date: Wed Jan 19 10:45:51 2011
New Revision: 677
Log:
[AMDATU-254] Refactored the dependency of the daemon service with he
availability of the internal cassandra daemon, such that starting cassandra can
be interrupted when needed (for example when bootstrapping fails it never
reaches the 'Normal' operation mode)
Added:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonAvailable.java
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/osgi/Activator.java
Wed Jan 19 10:45:51 2011
@@ -19,6 +19,8 @@
import org.amdatu.cassandra.application.CassandraConfigurationService;
import org.amdatu.cassandra.application.CassandraDaemonService;
import
org.amdatu.cassandra.application.service.CassandraConfigurationServiceImpl;
+import org.amdatu.cassandra.application.service.CassandraDaemonActivatorImpl;
+import org.amdatu.cassandra.application.service.CassandraDaemonAvailable;
import org.amdatu.cassandra.application.service.CassandraDaemonServiceImpl;
import org.amdatu.core.config.templates.ConfigTemplateManager;
import org.apache.felix.dm.DependencyActivatorBase;
@@ -44,11 +46,19 @@
.add(createServiceDependency().setService(ConfigTemplateManager.class).setRequired(true))
.add(createConfigurationDependency().setPid(CassandraConfigurationServiceImpl.PID)));
+ // Register the Cassandra daemon activator
+ manager.add(
+ createComponent()
+ .setImplementation(CassandraDaemonActivatorImpl.class)
+
.add(createServiceDependency().setService(LogService.class).setRequired(true))
+
.add(createServiceDependency().setService(CassandraConfigurationService.class).setRequired(true)));
+
// Register the Cassandra daemon service
manager.add(
createComponent()
.setImplementation(CassandraDaemonServiceImpl.class)
.setInterface(CassandraDaemonService.class.getName(), null)
+
.add(createServiceDependency().setService(CassandraDaemonAvailable.class).setRequired(true))
.add(createServiceDependency().setService(LogService.class).setRequired(true))
.add(createServiceDependency().setService(EventAdmin.class).setRequired(true))
.add(createServiceDependency().setService(CassandraConfigurationService.class).setRequired(true)));
Added:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
==============================================================================
--- (empty file)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonActivatorImpl.java
Wed Jan 19 10:45:51 2011
@@ -0,0 +1,179 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.cassandra.application.service;
+
+import org.amdatu.cassandra.application.CassandraConfigurationService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CassandraDaemon;
+import org.apache.felix.dm.DependencyManager;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.osgi.service.log.LogService;
+
+public class CassandraDaemonActivatorImpl implements CassandraDaemonAvailable {
+ // Timeout for the initial Thrift connection
+ private static final int THRIFT_RETRIES = 3;
+ private static final int THRIFT_TIMEOUT = 3000;
+ private static final int DAEMON_TIMEOUT = 5000;
+
+ // Service dependencies, injected by the framework
+ private volatile LogService m_logService;
+ private volatile CassandraConfigurationService m_configuration = null;
+ private volatile DependencyManager m_dependencyManager;
+
+ // The Cassandra daemon
+ private CassandraDaemon m_daemon = null;
+
+ // The CassandraDaemon cannot be stopped/started without stopping and
updating (to enforce classloader
+ // to be destroyed) this bundle. For that reason we block any attempts to
stop/start this service since
+ // that will fail.
+ private static boolean m_daemonHasShutdown = false;
+
+ /**
+ * The init() method is invoked by the Felix dependency manager.
+ */
+ public void init() {
+ if (m_daemonHasShutdown) {
+ throw new RuntimeException("CassandraDaemon has already been
shutdown and cannot be restarted.");
+ }
+
+ m_logService.log(LogService.LOG_INFO, getClass().getName() + " service
initialized");
+
+ try {
+ // Setup the cassandra daemon
+ m_daemon = new CassandraDaemon();
+ m_logService.log(LogService.LOG_INFO, getClass().getName() + "
service started.");
+ }
+ catch (Throwable t) {
+ m_logService.log(LogService.LOG_ERROR, "An error occurred while
starting Cassandra service", t);
+ }
+ }
+
+ public void start() {
+ if (m_daemonHasShutdown) {
+ throw new RuntimeException("CassandraDaemon has already been
shutdown and cannot be restarted.");
+ }
+
+ m_logService.log(LogService.LOG_INFO, "Starting Cassandra Daemon with
configuration: ");
+ m_logService.log(LogService.LOG_INFO, " Auto bootstrap mode = " +
m_configuration.isAutoBootstrapMode());
+ m_logService.log(LogService.LOG_INFO, " Default replication factor =
" + m_configuration.getDefaultReplicationFactor());
+ m_logService.log(LogService.LOG_INFO, " Read consistency level= " +
m_configuration.getReadConsistencyLevel());
+ m_logService.log(LogService.LOG_INFO, " Write consistency level = " +
m_configuration.getWriteConsistencyLevel());
+ m_logService.log(LogService.LOG_INFO, " RPC address = " +
m_configuration.getRPCAddress());
+ m_logService.log(LogService.LOG_INFO, " RPC Port = " +
m_configuration.getRPCPort());
+
+ // Activate the daemon from a separate thread, as the activate()
method never returns
+ new CassandraDaemonActivateThread().start();
+
+ // Start a new daemon listener thread. It will register a
CassandraDaemonAvailableService
+ // as soon as the daemon reached operation mode 'Normal' and a Thrift
connection could be
+ // established
+ new CassandraDaemonListenerThread().start();
+ }
+
+ public void stop() {
+ m_logService.log(LogService.LOG_INFO, "Shutting down Cassandra
Daemon");
+ m_daemon.deactivate();
+ m_daemonHasShutdown = true;
+ m_logService.log(LogService.LOG_INFO, "Cassandra Daemon stopped");
+ }
+
+ public void destroy() {
+ m_logService.log(LogService.LOG_INFO, getClass().getName() + " service
destroyed");
+ }
+
+ class CassandraDaemonActivateThread extends Thread {
+ public void run() {
+ m_daemon.activate();
+ }
+ }
+
+ class CassandraDaemonListenerThread extends Thread {
+ public void run() {
+ try {
+ // First wait until we can establish a Thrift connection to
the daemon, the connection is
+ // established as soon as the daemon is running.
+ testThriftConnection();
+
+ // Now wait until the operation mode of the daemon becomes
"Normal". In auto bootstrap mode this can take quite a
+ // while (2 minutes minimum). In a single node cluster this
will be almost immediately. Unfortunately the operation
+ // mode is not covered by any enum value.
+ String prevOperationMode = "";
+ String operationMode =
StorageService.instance.getOperationMode();
+ while (!operationMode.equals("Normal") && !isInterrupted() &&
isAlive()) {
+ if (!operationMode.equals(prevOperationMode)) {
+ m_logService.log(LogService.LOG_INFO, "Current
Cassandra Daemon operation mode is '" + operationMode
+ + "', waiting for daemon to reach operation mode
'Normal'");
+ }
+ prevOperationMode = operationMode;
+ operationMode = StorageService.instance.getOperationMode();
+ Thread.sleep(DAEMON_TIMEOUT);
+ }
+
+ if
("Normal".equals(StorageService.instance.getOperationMode())) {
+ m_logService.log(LogService.LOG_INFO, "Operation mode is
now 'Normal', continuing starting Cassandra");
+
+ // Register a new CassandraDaemonAvailable service
+ CassandraDaemonAvailable service = new
CassandraDaemonAvailable() {};
+ m_dependencyManager.add(
+ m_dependencyManager.createComponent()
+
.setInterface(CassandraDaemonAvailable.class.getName(), null)
+ .setImplementation(service));
+ }
+ }
+ catch (TTransportException e) {
+ m_logService.log(LogService.LOG_ERROR, "Could not establish
Thrift connection to Cassandra daemon, daemon could not be started.");
+ }
+ catch (InterruptedException e) {
+ m_logService.log(LogService.LOG_ERROR, "Starting Cassandra
daemon interrupted.");
+ }
+ }
+
+ /**
+ * This methods opens a Thrift connection to the Cassandra daemon and
returns if the connection
+ * has been established. This is usefull to ensure that the daemon is
running before continuing.
+ * @throws TTransportException
+ */
+ private void testThriftConnection() throws TTransportException {
+ int retry = 0;
+ try {
+ String thrift = m_configuration.getRPCAddress() + ":" +
m_configuration.getRPCPort();
+ m_logService.log(LogService.LOG_INFO, "Establishing Thrift
connection to the Cassandra Daemon on " + thrift);
+ TTransport tr = new TSocket(m_configuration.getRPCAddress(),
m_configuration.getRPCPort(), THRIFT_TIMEOUT);
+ TProtocol proto = new TBinaryProtocol(tr);
+ new Cassandra.Client(proto);
+ tr.open();
+ } catch (TTransportException e) {
+ retry++;
+ if (retry >= THRIFT_RETRIES) {
+ throw e;
+ }
+ try {
+ Thread.sleep(THRIFT_TIMEOUT);
+ }
+ catch (InterruptedException e1) {
+ }
+ m_logService.log(LogService.LOG_INFO, "Thrift connection
cannot yet be established, retrying... (" + retry + ")");
+ }
+ m_logService.log(LogService.LOG_INFO, "Thrift connection
established successfully");
+ }
+ }
+}
\ No newline at end of file
Added:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonAvailable.java
==============================================================================
--- (empty file)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonAvailable.java
Wed Jan 19 10:45:51 2011
@@ -0,0 +1,26 @@
+/*
+ Copyright (C) 2010 Amdatu.org
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.amdatu.cassandra.application.service;
+
+/**
+ * The sole purpose of this interface is to allow the CassandraDaemonService
to define a service
+ * dependency on the availability of the internal Cassandra daemon.
+ *
+ * @author ivol
+ */
+public interface CassandraDaemonAvailable {
+}
Modified:
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-application/src/main/java/org/amdatu/cassandra/application/service/CassandraDaemonServiceImpl.java
Wed Jan 19 10:45:51 2011
@@ -25,19 +25,12 @@
import org.amdatu.cassandra.application.CassandraDaemonService;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.CassandraDaemon;
import org.apache.cassandra.thrift.CassandraServer;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.log.LogService;
@@ -48,11 +41,6 @@
* @author ivol
*/
public class CassandraDaemonServiceImpl implements CassandraDaemonService {
- // Timeout for the initial Thrift connection
- private final int THRIFT_RETRIES = 3;
- private final int THRIFT_TIMEOUT = 3000;
- private final int DAEMON_TIMEOUT = 5000;
-
// The default placement strategy
private final String DEFAULT_PLACEMENT_STRATEGY =
"org.apache.cassandra.locator.SimpleStrategy";
@@ -61,85 +49,21 @@
private volatile EventAdmin m_eventAdmin;
private volatile CassandraConfigurationService m_configuration = null;
- // The CassandraDaemon cannot be stopped/started without stopping and
updating (to enforce classloader
- // to be destroyed) this bundle. For that reason we block any attempts to
stop/start this service since
- // that will fail.
- private static boolean m_daemonHasShutdown = false;
-
- private CassandraDaemon m_daemon = null;
private CassandraServer m_cassandraServer = null;
/**
* The init() method is invoked by the Felix dependency manager.
*/
public void init() {
- if (m_daemonHasShutdown) {
- throw new RuntimeException("CassandraDaemon has already been
shutdown and cannot be restarted.");
- }
-
m_logService.log(LogService.LOG_INFO, getClass().getName() + " service
initialized");
-
- try {
- // Setup the cassandra daemon
- m_daemon = new CassandraDaemon();
- m_logService.log(LogService.LOG_INFO, getClass().getName() + "
service started.");
- }
- catch (Throwable t) {
- m_logService.log(LogService.LOG_ERROR, "An error occurred while
starting Cassandra service", t);
- }
}
public void start() {
- if (m_daemonHasShutdown) {
- throw new RuntimeException("CassandraDaemon has already been
shutdown and cannot be restarted.");
- }
-
- m_logService.log(LogService.LOG_INFO, "Starting Cassandra Daemon with
configuration: ");
- m_logService.log(LogService.LOG_INFO, " Auto bootstrap mode = " +
m_configuration.isAutoBootstrapMode());
- m_logService.log(LogService.LOG_INFO, " Default replication factor =
" + m_configuration.getDefaultReplicationFactor());
- m_logService.log(LogService.LOG_INFO, " Read consistency level= " +
m_configuration.getReadConsistencyLevel());
- m_logService.log(LogService.LOG_INFO, " Write consistency level = " +
m_configuration.getWriteConsistencyLevel());
- m_logService.log(LogService.LOG_INFO, " RPC address = " +
m_configuration.getRPCAddress());
- m_logService.log(LogService.LOG_INFO, " RPC Port = " +
m_configuration.getRPCPort());
-
- // Activate the daemon from a separate thread, as the activate()
method never returns
- new CassandraDaemonActivateThread().start();
- try {
- // Now establish a Thrift connection to the daemon, the connection
is established as soon
- // as the daemon is running.
- testThriftConnection();
-
- // Now wait until the operation mode of Cassandra becomes
"Normal". In auto bootstrap mode this can take quite a
- // while (2 minutes minimum). In a single node cluster this will
be amost immediately. Unfortunately the operation
- // mode is not covered by any enum value.
- String prevOperationMode = "";
- String operationMode = StorageService.instance.getOperationMode();
- while (!operationMode.equals("Normal")) {
- if (!operationMode.equals(prevOperationMode)) {
- m_logService.log(LogService.LOG_INFO, "Current Cassandra
Daemon operation mode is '" + operationMode
- + "', waiting for daemon to reach operation mode
'Normal'");
- }
- Thread.sleep(DAEMON_TIMEOUT);
- }
- m_logService.log(LogService.LOG_INFO, "Operation mode is now '" +
StorageService.instance.getOperationMode()
- + "', continuing starting Cassandra");
-
- // Create the cassandra server
- m_cassandraServer = new CassandraServer();
- }
- catch (TTransportException e) {
- m_logService.log(LogService.LOG_INFO, "Could not establish a
Thrift connection to the Cassandra Daemon", e);
- }
- catch (InterruptedException e) {
- m_logService.log(LogService.LOG_INFO, "Waiting for Cassandra
Daemon to reach normal operation mode interrupted", e);
- }
+ m_cassandraServer = new CassandraServer();
}
public void stop() {
- m_logService.log(LogService.LOG_INFO, "Shutting down Cassandra
Daemon");
- m_daemon.deactivate();
- m_daemonHasShutdown = true;
- m_logService.log(LogService.LOG_INFO, "Cassandra Daemon stopped");
+ m_logService.log(LogService.LOG_INFO, getClass().getName() + "service
stopped");
}
public void destroy() {
@@ -304,40 +228,4 @@
}
return false;
}
-
- class CassandraDaemonActivateThread extends Thread {
- public void run() {
- m_daemon.activate();
- }
- }
-
- /**
- * This methods opens a Thrift connection to the Cassandra daemon and
returns if the connection
- * has been established. This is usefull to ensure that the daemon is
running before continuing.
- * @throws TTransportException
- */
- private void testThriftConnection() throws TTransportException {
- int retry = 0;
- try {
- String thrift = m_configuration.getRPCAddress() + ":" +
m_configuration.getRPCPort();
- m_logService.log(LogService.LOG_INFO, "Establishing Thrift
connection to the Cassandra Daemon on " + thrift);
- TTransport tr = new TSocket(m_configuration.getRPCAddress(),
m_configuration.getRPCPort(), THRIFT_TIMEOUT);
- TProtocol proto = new TBinaryProtocol(tr);
- new Cassandra.Client(proto);
- tr.open();
- } catch (TTransportException e) {
- retry++;
- if (retry >= THRIFT_RETRIES) {
- throw e;
- }
- try {
- Thread.sleep(THRIFT_TIMEOUT);
- }
- catch (InterruptedException e1) {
- }
- m_logService.log(LogService.LOG_INFO, "Thrift connection cannot
yet be established, retrying... (" + retry + ")");
- }
- m_logService.log(LogService.LOG_INFO, "Thrift connection established
successfully");
- }
-
}