Author: jbellis Date: Tue Sep 14 03:20:44 2010 New Revision: 996750 URL: http://svn.apache.org/viewvc?rev=996750&view=rev Log: move common setup code into AbstractCassandraDaemon. patch by Amol Deshpande; reviewed by jbellis for CASSANDRA-1500
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java?rev=996750&r1=996749&r2=996750&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java Tue Sep 14 03:20:44 2010 @@ -19,28 +19,13 @@ package org.apache.cassandra.avro; import java.io.IOException; -import java.net.InetAddress; -import java.util.UUID; -import org.apache.avro.ipc.HttpServer; -import org.apache.avro.ipc.ResponderServlet; -import org.apache.avro.specific.SpecificResponder; - -import org.apache.cassandra.config.ConfigurationException; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.CompactionManager; -import org.apache.cassandra.db.SystemTable; -import org.apache.cassandra.db.Table; -import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.migration.Migration; -import org.apache.cassandra.service.MigrationManager; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Mx4jTool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// see CASSANDRA-1440 +import org.apache.avro.ipc.ResponderServlet; +import org.apache.avro.specific.SpecificResponder; +import org.apache.cassandra.utils.Mx4jTool; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.ServletHolder; @@ -51,85 +36,7 @@ import org.mortbay.jetty.servlet.Servlet public class CassandraDaemon extends org.apache.cassandra.service.AbstractCassandraDaemon { private static Logger logger = LoggerFactory.getLogger(CassandraDaemon.class); private org.mortbay.jetty.Server server; - private InetAddress listenAddr; - private int listenPort; - - protected void setup() throws IOException - { - FBUtilities.tryMlockall(); - - listenPort = DatabaseDescriptor.getRpcPort(); - listenAddr = DatabaseDescriptor.getRpcAddress(); - - /* - * If ThriftAddress was left completely unconfigured, then assume - * the same default as ListenAddress - */ - if (listenAddr == null) - listenAddr = FBUtilities.getLocalAddress(); - - Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() - { - public void uncaughtException(Thread t, Throwable e) - { - logger.error("Fatal exception in thread " + t, e); - if (e instanceof OutOfMemoryError) - { - System.exit(100); - } - } - }); - - // check the system table for mismatched partitioner. - try - { - SystemTable.checkHealth(); - } - catch (ConfigurationException e) - { - logger.error("Fatal exception during initialization", e); - System.exit(100); - } - - try - { - DatabaseDescriptor.loadSchemas(); - } - catch (IOException e) - { - logger.error("Fatal exception during initialization", e); - System.exit(100); - } - - // initialize keyspaces - for (String table : DatabaseDescriptor.getTables()) - { - if (logger.isDebugEnabled()) - logger.debug("opening keyspace " + table); - Table.open(table); - } - // replay the log if necessary and check for compaction candidates - CommitLog.recover(); - CompactionManager.instance.checkAllColumnFamilies(); - - // check to see if CL.recovery modified the lastMigrationId. if it did, we need to re apply migrations. this isn't - // the same as merely reloading the schema (which wouldn't perform file deletion after a DROP). The solution - // is to read those migrations from disk and apply them. - UUID currentMigration = DatabaseDescriptor.getDefsVersion(); - UUID lastMigration = Migration.getLastMigrationId(); - if ((lastMigration != null) && (lastMigration.timestamp() > currentMigration.timestamp())) - { - MigrationManager.applyMigrations(currentMigration, lastMigration); - } - - SystemTable.purgeIncompatibleHints(); - - // start server internals - StorageService.instance.initServer(); - - } - /** hook for JSVC */ public void start() throws IOException { @@ -139,7 +46,6 @@ public class CassandraDaemon extends org SpecificResponder responder = new SpecificResponder(Cassandra.class, cassandraServer); logger.info("Listening for avro clients..."); - Mx4jTool.maybeLoad(); // FIXME: This isn't actually binding to listenAddr (it should). server = new org.mortbay.jetty.Server(listenPort); Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=996750&r1=996749&r2=996750&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Tue Sep 14 03:20:44 2010 @@ -20,14 +20,25 @@ package org.apache.cassandra.service; import java.io.File; import java.io.IOException; +import java.net.InetAddress; +import java.util.UUID; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.RejectedExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.CompactionManager; +import org.apache.cassandra.db.SystemTable; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.migration.Migration; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Mx4jTool; import org.mortbay.thread.ThreadPool; /** @@ -42,15 +53,95 @@ public abstract class AbstractCassandraD private static Logger logger = LoggerFactory .getLogger(AbstractCassandraDaemon.class); + protected InetAddress listenAddr; + protected int listenPort; + public static final int MIN_WORKER_THREADS = 64; /** * This is a hook for concrete daemons to initialize themselves suitably. - * + * + * Subclasses should override this to finish the job (listening on ports, etc.) + * * @throws IOException */ - protected abstract void setup() throws IOException; - + protected void setup() throws IOException + { + FBUtilities.tryMlockall(); + + listenPort = DatabaseDescriptor.getRpcPort(); + listenAddr = DatabaseDescriptor.getRpcAddress(); + + /* + * If ThriftAddress was left completely unconfigured, then assume + * the same default as ListenAddress + */ + if (listenAddr == null) + listenAddr = FBUtilities.getLocalAddress(); + + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() + { + public void uncaughtException(Thread t, Throwable e) + { + logger.error("Fatal exception in thread " + t, e); + if (e instanceof OutOfMemoryError) + { + System.exit(100); + } + } + }); + + // check the system table for mismatched partitioner. + try + { + SystemTable.checkHealth(); + } + catch (ConfigurationException e) + { + logger.error("Fatal exception during initialization", e); + System.exit(100); + } + + try + { + DatabaseDescriptor.loadSchemas(); + } + catch (IOException e) + { + logger.error("Fatal exception during initialization", e); + System.exit(100); + } + + // initialize keyspaces + for (String table : DatabaseDescriptor.getTables()) + { + if (logger.isDebugEnabled()) + logger.debug("opening keyspace " + table); + Table.open(table); + } + + // replay the log if necessary and check for compaction candidates + CommitLog.recover(); + CompactionManager.instance.checkAllColumnFamilies(); + + // check to see if CL.recovery modified the lastMigrationId. if it did, we need to re apply migrations. this isn't + // the same as merely reloading the schema (which wouldn't perform file deletion after a DROP). The solution + // is to read those migrations from disk and apply them. + UUID currentMigration = DatabaseDescriptor.getDefsVersion(); + UUID lastMigration = Migration.getLastMigrationId(); + if ((lastMigration != null) && (lastMigration.timestamp() > currentMigration.timestamp())) + { + MigrationManager.applyMigrations(currentMigration, lastMigration); + } + + SystemTable.purgeIncompatibleHints(); + + // start server internals + StorageService.instance.initServer(); + + Mx4jTool.maybeLoad(); + } + /** * Initialize the Cassandra Daemon based on the given <a * href="http://commons.apache.org/daemon/jsvc.html">Commons @@ -155,7 +246,6 @@ public abstract class AbstractCassandraD /** The following are cribbed from org.mortbay.thread.concurrent */ /*********************************************************************/ - @Override public boolean dispatch(Runnable job) { try @@ -170,25 +260,21 @@ public abstract class AbstractCassandraD } } - @Override public int getIdleThreads() { return getPoolSize()-getActiveCount(); } - @Override public int getThreads() { return getPoolSize(); } - @Override public boolean isLowOnThreads() { return getActiveCount()>=getMaximumPoolSize(); } - @Override public void join() throws InterruptedException { this.awaitTermination(Long.MAX_VALUE,TimeUnit.MILLISECONDS); Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=996750&r1=996749&r2=996750&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java Tue Sep 14 03:20:44 2010 @@ -19,25 +19,13 @@ package org.apache.cassandra.thrift; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.util.UUID; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.apache.cassandra.config.ConfigurationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.CompactionManager; -import org.apache.cassandra.db.SystemTable; -import org.apache.cassandra.db.Table; -import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.migration.Migration; -import org.apache.cassandra.service.MigrationManager; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.CLibrary; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Mx4jTool; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocolFactory; @@ -46,8 +34,6 @@ import org.apache.thrift.transport.TFram import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.TTransportFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This class supports two methods for creating a Cassandra node daemon, @@ -65,78 +51,7 @@ public class CassandraDaemon extends org protected void setup() throws IOException { - FBUtilities.tryMlockall(); - - int listenPort = DatabaseDescriptor.getRpcPort(); - InetAddress listenAddr = DatabaseDescriptor.getRpcAddress(); - - /* - * If ThriftAddress was left completely unconfigured, then assume - * the same default as ListenAddress - */ - if (listenAddr == null) - listenAddr = FBUtilities.getLocalAddress(); - - Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() - { - public void uncaughtException(Thread t, Throwable e) - { - logger.error("Uncaught exception in thread " + t, e); - if (e instanceof OutOfMemoryError) - { - System.exit(100); - } - } - }); - - // check the system table for mismatched partitioner. - try - { - SystemTable.checkHealth(); - } - catch (ConfigurationException e) - { - logger.error("Fatal exception during initialization", e); - System.exit(100); - } - - try - { - DatabaseDescriptor.loadSchemas(); - } - catch (IOException e) - { - logger.error("Fatal exception during initialization", e); - System.exit(100); - } - - // initialize keyspaces - for (String table : DatabaseDescriptor.getTables()) - { - if (logger.isDebugEnabled()) - logger.debug("opening keyspace " + table); - Table.open(table); - } - - // replay the log if necessary and check for compaction candidates - CommitLog.recover(); - CompactionManager.instance.checkAllColumnFamilies(); - - // check to see if CL.recovery modified the lastMigrationId. if it did, we need to re apply migrations. this isn't - // the same as merely reloading the schema (which wouldn't perform file deletion after a DROP). The solution - // is to read those migrations from disk and apply them. - UUID currentMigration = DatabaseDescriptor.getDefsVersion(); - UUID lastMigration = Migration.getLastMigrationId(); - if ((lastMigration != null) && (lastMigration.timestamp() > currentMigration.timestamp())) - { - MigrationManager.applyMigrations(currentMigration, lastMigration); - } - - SystemTable.purgeIncompatibleHints(); - - // start server internals - StorageService.instance.initServer(); - + super.setup(); // now we start listening for clients final CassandraServer cassandraServer = new CassandraServer(); Cassandra.Processor processor = new Cassandra.Processor(cassandraServer); @@ -176,7 +91,6 @@ public class CassandraDaemon extends org outTransportFactory = new TTransportFactory(); } - // ThreadPool Server CustomTThreadPoolServer.Options options = new CustomTThreadPoolServer.Options(); options.minWorkerThreads = MIN_WORKER_THREADS; @@ -198,7 +112,6 @@ public class CassandraDaemon extends org public void start() { logger.info("Listening for thrift clients..."); - Mx4jTool.maybeLoad(); serverEngine.serve(); }