Author: jbellis Date: Thu Mar 3 23:49:44 2011 New Revision: 1076891 URL: http://svn.apache.org/viewvc?rev=1076891&view=rev Log: reformat
Modified: cassandra/branches/cassandra-0.6/interface/cassandra.thrift cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Modified: cassandra/branches/cassandra-0.6/interface/cassandra.thrift URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/interface/cassandra.thrift?rev=1076891&r1=1076890&r2=1076891&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/interface/cassandra.thrift (original) +++ cassandra/branches/cassandra-0.6/interface/cassandra.thrift Thu Mar 3 23:49:44 2011 @@ -300,7 +300,7 @@ service Cassandra { ColumnOrSuperColumn get(1:required string keyspace, 2:required string key, 3:required ColumnPath column_path, - 4:required ConsistencyLevel consistency_level=ONE) + 4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE) throws (1:InvalidRequestException ire, 2:NotFoundException nfe, 3:UnavailableException ue, 4:TimedOutException te), /** @@ -311,7 +311,7 @@ service Cassandra { 2:required string key, 3:required ColumnParent column_parent, 4:required SlicePredicate predicate, - 5:required ConsistencyLevel consistency_level=ONE) + 5:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE) throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), /** @@ -323,7 +323,7 @@ service Cassandra { map<string,ColumnOrSuperColumn> multiget(1:required string keyspace, 2:required list<string> keys, 3:required ColumnPath column_path, - 4:required ConsistencyLevel consistency_level=ONE) + 4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE) throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), /** @@ -333,7 +333,7 @@ service Cassandra { 2:required list<string> keys, 3:required ColumnParent column_parent, 4:required SlicePredicate predicate, - 5:required ConsistencyLevel consistency_level=ONE) + 5:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE) throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), /** @@ -342,7 +342,7 @@ service Cassandra { i32 get_count(1:required string keyspace, 2:required string key, 3:required ColumnParent column_parent, - 4:required ConsistencyLevel consistency_level=ONE) + 4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE) throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), /** @@ -355,7 +355,7 @@ service Cassandra { 4:required string start_key="", 5:required string finish_key="", 6:required i32 row_count=100, - 7:required ConsistencyLevel consistency_level=ONE) + 7:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE) throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), /** @@ -365,7 +365,7 @@ service Cassandra { 2:required ColumnParent column_parent, 3:required SlicePredicate predicate, 4:required KeyRange range, - 5:required ConsistencyLevel consistency_level=ONE) + 5:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE) throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), # modification methods @@ -380,7 +380,7 @@ service Cassandra { 3:required ColumnPath column_path, 4:required binary value, 5:required i64 timestamp, - 6:required ConsistencyLevel consistency_level=ONE) + 6:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE) throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), /** @@ -392,7 +392,7 @@ service Cassandra { void batch_insert(1:required string keyspace, 2:required string key, 3:required map<string, list<ColumnOrSuperColumn>> cfmap, - 4:required ConsistencyLevel consistency_level=ONE) + 4:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE) throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), /** @@ -404,7 +404,7 @@ service Cassandra { 2:required string key, 3:required ColumnPath column_path, 4:required i64 timestamp, - 5:ConsistencyLevel consistency_level=ONE) + 5:ConsistencyLevel consistency_level=ConsistencyLevel.ONE) throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), /** @@ -414,7 +414,7 @@ service Cassandra { **/ void batch_mutate(1:required string keyspace, 2:required map<string, map<string, list<Mutation>>> mutation_map, - 3:required ConsistencyLevel consistency_level=ONE) + 3:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE) throws (1:InvalidRequestException ire, 2:UnavailableException ue, 3:TimedOutException te), // Meta-APIs -- APIs to get information about the node or cluster, Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1076891&r1=1076890&r2=1076891&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java Thu Mar 3 23:49:44 2011 @@ -39,147 +39,178 @@ import org.apache.thrift.transport.TTran /** * Slightly modified version of the Apache Thrift TThreadPoolServer. - * + * <p/> * This allows passing an executor so you have more control over the actual * behaviour of the tasks being run. - * + * <p/> * Newer version of Thrift should make this obsolete. */ -public class CustomTThreadPoolServer extends TServer { +public class CustomTThreadPoolServer extends TServer +{ -private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName()); + private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName()); -// Executor service for handling client connections -private ExecutorService executorService_; + // Executor service for handling client connections + private ExecutorService executorService_; -// Flag for stopping the server -private volatile boolean stopped_; - -// Server options -private Options options_; - -// Customizable server options -public static class Options { - public int minWorkerThreads = 5; - public int maxWorkerThreads = Integer.MAX_VALUE; - public int stopTimeoutVal = 60; - public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; -} - - -public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory, - TServerSocket tServerSocket, - TTransportFactory inTransportFactory, - TTransportFactory outTransportFactory, - TProtocolFactory tProtocolFactory, - TProtocolFactory tProtocolFactory2, - Options options, - ExecutorService executorService) { - - super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory, - tProtocolFactory, tProtocolFactory2); - options_ = options; - executorService_ = executorService; -} - - -public void serve() { - try { - serverTransport_.listen(); - } catch (TTransportException ttx) { - LOGGER.error("Error occurred during listening.", ttx); - return; - } - - stopped_ = false; - while (!stopped_) { - int failureCount = 0; - try { - TTransport client = serverTransport_.accept(); - WorkerProcess wp = new WorkerProcess(client); - executorService_.execute(wp); - } catch (TTransportException ttx) { - if (!stopped_) { - ++failureCount; - LOGGER.warn("Transport error occurred during acceptance of message.", ttx); - } - } - } - - executorService_.shutdown(); - - // Loop until awaitTermination finally does return without a interrupted - // exception. If we don't do this, then we'll shut down prematurely. We want - // to let the executorService clear it's task queue, closing client sockets - // appropriately. - long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal); - long now = System.currentTimeMillis(); - while (timeoutMS >= 0) { - try { - executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); - break; - } catch (InterruptedException ix) { - long newnow = System.currentTimeMillis(); - timeoutMS -= (newnow - now); - now = newnow; - } - } -} - -public void stop() { - stopped_ = true; - serverTransport_.interrupt(); -} - -private class WorkerProcess implements Runnable { - - /** - * Client that this services. - */ - private TTransport client_; - - /** - * Default constructor. - * - * @param client Transport to process - */ - private WorkerProcess(TTransport client) { - client_ = client; - } - - /** - * Loops on processing a client forever - */ - public void run() { - TProcessor processor = null; - TTransport inputTransport = null; - TTransport outputTransport = null; - TProtocol inputProtocol = null; - TProtocol outputProtocol = null; - try { - processor = processorFactory_.getProcessor(client_); - inputTransport = inputTransportFactory_.getTransport(client_); - outputTransport = outputTransportFactory_.getTransport(client_); - inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); - outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); - // we check stopped_ first to make sure we're not supposed to be shutting - // down. this is necessary for graceful shutdown. - while (!stopped_ && processor.process(inputProtocol, outputProtocol)) {} - } catch (TTransportException ttx) { - // Assume the client died and continue silently - } catch (TException tx) { - LOGGER.error("Thrift error occurred during processing of message.", tx); - } catch (Exception x) { - LOGGER.error("Error occurred during processing of message.", x); - } - - if (inputTransport != null) { - inputTransport.close(); - } - - if (outputTransport != null) { - outputTransport.close(); - } - } -} + // Flag for stopping the server + private volatile boolean stopped_; + + // Server options + private Options options_; + + // Customizable server options + public static class Options + { + public int minWorkerThreads = 5; + public int maxWorkerThreads = Integer.MAX_VALUE; + public int stopTimeoutVal = 60; + public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; + } + + + public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory, + TServerSocket tServerSocket, + TTransportFactory inTransportFactory, + TTransportFactory outTransportFactory, + TProtocolFactory tProtocolFactory, + TProtocolFactory tProtocolFactory2, + Options options, + ExecutorService executorService) + { + + super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory, + tProtocolFactory, tProtocolFactory2); + options_ = options; + executorService_ = executorService; + } + + + public void serve() + { + try + { + serverTransport_.listen(); + } + catch (TTransportException ttx) + { + LOGGER.error("Error occurred during listening.", ttx); + return; + } + + stopped_ = false; + while (!stopped_) + { + int failureCount = 0; + try + { + TTransport client = serverTransport_.accept(); + WorkerProcess wp = new WorkerProcess(client); + executorService_.execute(wp); + } + catch (TTransportException ttx) + { + if (!stopped_) + { + ++failureCount; + LOGGER.warn("Transport error occurred during acceptance of message.", ttx); + } + } + } + + executorService_.shutdown(); + + // Loop until awaitTermination finally does return without a interrupted + // exception. If we don't do this, then we'll shut down prematurely. We want + // to let the executorService clear it's task queue, closing client sockets + // appropriately. + long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal); + long now = System.currentTimeMillis(); + while (timeoutMS >= 0) + { + try + { + executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); + break; + } + catch (InterruptedException ix) + { + long newnow = System.currentTimeMillis(); + timeoutMS -= (newnow - now); + now = newnow; + } + } + } + + public void stop() + { + stopped_ = true; + serverTransport_.interrupt(); + } + + private class WorkerProcess implements Runnable + { + + /** + * Client that this services. + */ + private TTransport client_; + + /** + * Default constructor. + * + * @param client Transport to process + */ + private WorkerProcess(TTransport client) + { + client_ = client; + } + + /** + * Loops on processing a client forever + */ + public void run() + { + TProcessor processor = null; + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + try + { + processor = processorFactory_.getProcessor(client_); + inputTransport = inputTransportFactory_.getTransport(client_); + outputTransport = outputTransportFactory_.getTransport(client_); + inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); + outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); + // we check stopped_ first to make sure we're not supposed to be shutting + // down. this is necessary for graceful shutdown. + while (!stopped_ && processor.process(inputProtocol, outputProtocol)) + { + } + } + catch (TTransportException ttx) + { + // Assume the client died and continue silently + } + catch (TException tx) + { + LOGGER.error("Thrift error occurred during processing of message.", tx); + } + catch (Exception x) + { + LOGGER.error("Error occurred during processing of message.", x); + } + + if (inputTransport != null) + { + inputTransport.close(); + } + + if (outputTransport != null) + { + outputTransport.close(); + } + } + } }