Author: [email protected]
Date: Fri Feb 17 14:57:14 2012
New Revision: 2100
Log:
[AMDATUCASSANDRA-164] Reintroduced the separate thread, as it causes a deadlock
when client and server run in the same VM: the start method blocks until the
daemon is available, but the same thread starts all bundles
Modified:
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
Modified:
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
==============================================================================
---
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
(original)
+++
trunk/amdatu-cassandra/cassandra-client/src/main/java/org/amdatu/cassandra/client/service/CassandraClientActivatorServiceImpl.java
Fri Feb 17 14:57:14 2012
@@ -15,19 +15,14 @@
*/
package org.amdatu.cassandra.client.service;
-import static
org.amdatu.cassandra.client.service.CassandraClientConfigurationService.HOSTS;
-import static
org.amdatu.cassandra.client.service.CassandraClientConfigurationService.PORT;
+import static
org.amdatu.cassandra.client.service.CassandraClientConfigurationService.CLUSTER_NAME;
+import me.prettyprint.cassandra.service.ThriftCluster;
+import me.prettyprint.hector.api.Cluster;
import org.amdatu.cassandra.client.CassandraDaemonAvailable;
-import org.apache.cassandra.thrift.Cassandra;
import org.apache.felix.dm.Component;
import org.apache.felix.dm.DependencyManager;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
import org.osgi.service.log.LogService;
/**
@@ -43,7 +38,7 @@
*/
public class CassandraClientActivatorServiceImpl {
// Timeout for the initial Thrift connection
- private static final int THRIFT_RETRIES = 3;
+ private static final int THRIFT_RETRIES = 5;
private static final int THRIFT_TIMEOUT = 3000;
// Service dependencies injected by the dependency manager
@@ -51,77 +46,69 @@
private volatile LogService m_logService;
private volatile CassandraClientConfigurationService m_configuration;
+ private static CassandraDaemonListenerThread m_activatorThread = null;
private Component m_serviceAvailableComponent = null;
- public synchronized void start() {
- // Test the Thrift connection and register a
CassandraDaemonAvailableService
- // as soon as a Thrift connection could be established
- registerAvailableServiec();
- }
-
- public synchronized void stop() {
- // Unregister the CassandraDaemonAvailable service component
- if (m_serviceAvailableComponent != null) {
- m_dependencyManager.remove(m_serviceAvailableComponent);
+ public void start() {
+ // We start the activator thread only once during the lifetime of this
bundle classloader.
+ // It is only intended to hold the system until the Cassandra host is
available, this may
+ // take some time in case the Cassandra server runs inside this very
same container.
+ if (m_activatorThread == null) {
+ // Start a new daemon listener thread. It will register a
CassandraDaemonAvailableService
+ // as soon as Hector can successfully create a connection to the
Cassandra server
+ m_activatorThread = new CassandraDaemonListenerThread();
+ m_activatorThread.start();
}
}
- private void registerAvailableServiec() {
- try {
+ class CassandraDaemonListenerThread extends Thread {
+ public void run() {
// First wait until we can establish a Thrift connection to the
daemon, the connection is
// established as soon as the daemon is running.
- testThriftConnection();
+ if (waitForConnection()) {
- // Register a new CassandraDaemonAvailable service
- CassandraDaemonAvailable service = new CassandraDaemonAvailable() {
- };
-
- m_serviceAvailableComponent = m_dependencyManager.createComponent()
-
.setInterface(CassandraDaemonAvailable.class.getName(), null)
- .setImplementation(service);
- m_dependencyManager.add(m_serviceAvailableComponent);
- }
- catch (TTransportException e) {
- m_logService.log(LogService.LOG_ERROR, "Could not establish Thrift
connection to Cassandra daemon, "
- + "daemon could not be started.");
+ // Register a new CassandraDaemonAvailable service
+ CassandraDaemonAvailable service = new
CassandraDaemonAvailable() {
+ };
+
+ m_serviceAvailableComponent =
m_dependencyManager.createComponent()
+ .setInterface(CassandraDaemonAvailable.class.getName(),
null)
+ .setImplementation(service);
+ m_dependencyManager.add(m_serviceAvailableComponent);
+ }
}
}
- /**
- * This methods opens a Thrift connection to the Cassandra daemon and
returns if the connection
- * has been established. This is useful to ensure that the daemon is
running before continuing.
- *
- * @throws TTransportException
- */
- private void testThriftConnection() throws TTransportException {
+ private boolean waitForConnection() {
+ // Try THRIFT_RETRIES times to get a connection
int retry = 0;
+ Cluster cluster = null;
+ ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
try {
- String thrift =
- m_configuration.get(HOSTS, String.class) + ":" +
m_configuration.get(PORT, Integer.class);
- m_logService.log(LogService.LOG_INFO, "Establishing Thrift
connection to the Cassandra Daemon on "
- + thrift);
- TTransport tr =
- new TSocket(m_configuration.get(HOSTS,
String.class), m_configuration.get(PORT, Integer.class),
- THRIFT_TIMEOUT);
- TProtocol proto = new TBinaryProtocol(tr);
- new Cassandra.Client(proto);
- tr.open();
- }
- catch (TTransportException e) {
- retry++;
- if (retry >= THRIFT_RETRIES) {
- throw e;
- }
- try {
- Thread.sleep(THRIFT_TIMEOUT);
- }
- catch (InterruptedException e1) {
- m_logService.log(LogService.LOG_DEBUG, "Wait for Thrift
connection interrupted");
+ while (retry < THRIFT_RETRIES) {
+ try {
+ // Perform a classloader switch to prevent log4j trying to
load classes from the system classloader
+ // instead of the bundle classloader.
+
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ String clusterName = m_configuration.get(CLUSTER_NAME,
String.class);
+ cluster = new ThriftCluster(clusterName,
m_configuration.getCassandraHostConfigurator());
+ cluster.describeKeyspaces();
+ m_logService.log(LogService.LOG_INFO, "Thrift connection
established successfully");
+ return true;
+ }
+ catch (Exception e) {
+ retry++;
+ Thread.sleep(THRIFT_TIMEOUT);
+ }
+ finally {
+ cluster.getConnectionManager().shutdown();
+ }
}
- m_logService.log(LogService.LOG_INFO, "Thrift connection cannot
yet be established, retrying... ("
- + retry + ")");
}
- m_logService.log(LogService.LOG_INFO, "Thrift connection established
successfully");
+ catch (Exception e) {}
+ finally {
+ Thread.currentThread().setContextClassLoader(oldClassLoader);
+ }
+ return false;
}
-
}
_______________________________________________
Amdatu-commits mailing list
[email protected]
http://lists.amdatu.org/mailman/listinfo/amdatu-commits