Add ability to use custom TServerFactory implementations
patch by Jason Brown; reviewed by Pavel Yaskevich for CASSANDRA-4608


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8264eb21
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8264eb21
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8264eb21

Branch: refs/heads/trunk
Commit: 8264eb21ccb20423ff7bdae0fbef6d88fe2b2529
Parents: 6eafeb2
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Fri Oct 5 14:11:13 2012 -0700
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Fri Oct 5 14:12:35 2012 -0700

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 conf/cassandra.yaml                                |    5 +-
 .../cassandra/config/DatabaseDescriptor.java       |    2 -
 .../apache/cassandra/thrift/CassandraDaemon.java   |  120 ++-------------
 .../apache/cassandra/thrift/CustomTHsHaServer.java |   39 +++++
 .../cassandra/thrift/CustomTNonBlockingServer.java |   31 ++++
 .../cassandra/thrift/CustomTThreadPoolServer.java  |   32 ++++
 .../cassandra/thrift/TServerCustomFactory.java     |   75 +++++++++
 .../apache/cassandra/thrift/TServerFactory.java    |   43 +++++
 9 files changed, 241 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c680f03..4d2fd27 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -19,6 +19,7 @@
  * Pluggable Thrift transport factories for CLI (CASSANDRA-4609)
  * Backport adding AlterKeyspace statement (CASSANDRA-4611)
  * (CQL3) Correcty accept upper-case data types (CASSANDRA-4770)
+ * Add ability to use custom TServerFactory implementations (CASSANDRA-4608)
 Merged from 1.0:
  * Switch from NBHM to CHM in MessagingService's callback map, which
    prevents OOM in long-running instances (CASSANDRA-4708)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index c4732db..5e0be98 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -287,7 +287,7 @@ rpc_port: 9160
 # enable or disable keepalive on rpc connections
 rpc_keepalive: true
 
-# Cassandra provides three options for the RPC Server:
+# Cassandra provides three out-of-the-box options for the RPC Server:
 #
 # sync  -> One connection per thread in the rpc pool (see below).
 #          For a very large number of clients, memory will be your limiting
@@ -305,6 +305,9 @@ rpc_keepalive: true
 #
 # The default is sync because on Windows hsha is about 30% slower.  On Linux,
 # sync/hsha performance is about the same, with hsha of course using less 
memory.
+#
+# Alternatively,  can provide your own RPC server by providing the 
fully-qualified class name
+# of an o.a.c.t.TServerFactory that can create an instance of it.
 rpc_server_type: sync
 
 # Uncomment rpc_min|max|thread to set request pool size.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 20fa981..7ed6170 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -381,8 +381,6 @@ public class DatabaseDescriptor
             if (conf.stream_throughput_outbound_megabits_per_sec == null)
                 conf.stream_throughput_outbound_megabits_per_sec = 400;
 
-            if 
(!CassandraDaemon.rpc_server_types.contains(conf.rpc_server_type.toLowerCase()))
-                throw new ConfigurationException("Unknown rpc_server_type: " + 
conf.rpc_server_type);
             if (conf.rpc_min_threads == null)
                 conf.rpc_min_threads = 
conf.rpc_server_type.toLowerCase().equals("hsha")
                                      ? 
Runtime.getRuntime().availableProcessors() * 4

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java 
b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
index 7153c08..2decb8e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
@@ -20,28 +20,14 @@ package org.apache.cassandra.thrift;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.cassandra.service.AbstractCassandraDaemon;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TThreadPoolServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.cassandra.service.AbstractCassandraDaemon;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
 
 /**
  * This class supports two methods for creating a Cassandra node daemon,
@@ -62,10 +48,9 @@ public class CassandraDaemon extends 
org.apache.cassandra.service.AbstractCassan
     }
 
     private static Logger logger = 
LoggerFactory.getLogger(CassandraDaemon.class);
-    private final static String SYNC = "sync";
-    private final static String ASYNC = "async";
-    private final static String HSHA = "hsha";
-    public final static List<String> rpc_server_types = Arrays.asList(SYNC, 
ASYNC, HSHA);
+    final static String SYNC = "sync";
+    final static String ASYNC = "async";
+    final static String HSHA = "hsha";
     private ThriftServer server;
 
     protected void startServer()
@@ -117,94 +102,21 @@ public class CassandraDaemon extends 
org.apache.cassandra.service.AbstractCassan
         public ThriftServer(InetAddress listenAddr, int listenPort)
         {
             // now we start listening for clients
-            final CassandraServer cassandraServer = new CassandraServer();
-            Cassandra.Processor processor = new 
Cassandra.Processor(cassandraServer);
-
-            // Transport
             logger.info(String.format("Binding thrift service to %s:%s", 
listenAddr, listenPort));
 
-            // Protocol factory
-            TProtocolFactory tProtocolFactory = new 
TBinaryProtocol.Factory(true, true, 
DatabaseDescriptor.getThriftMaxMessageLength());
-
-            // Transport factory
+            TServerFactory.Args args = new TServerFactory.Args();
+            args.tProtocolFactory = new TBinaryProtocol.Factory(true, true, 
DatabaseDescriptor.getThriftMaxMessageLength());
+            args.addr = new InetSocketAddress(listenAddr, listenPort);
+            args.cassandraServer = new CassandraServer();
+            args.processor = new Cassandra.Processor(args.cassandraServer);
+            args.keepAlive = DatabaseDescriptor.getRpcKeepAlive();
+            args.sendBufferSize = DatabaseDescriptor.getRpcSendBufferSize();
+            args.recvBufferSize = DatabaseDescriptor.getRpcRecvBufferSize();
             int tFramedTransportSize = 
DatabaseDescriptor.getThriftFramedTransportSize();
-            TTransportFactory inTransportFactory = new 
TFramedTransport.Factory(tFramedTransportSize);
-            TTransportFactory outTransportFactory = new 
TFramedTransport.Factory(tFramedTransportSize);
-            logger.info("Using TFastFramedTransport with a max frame size of 
{} bytes.", tFramedTransportSize);
-
-            if (DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(SYNC))
-            {
-                TServerTransport serverTransport;
-                try
-                {
-                    serverTransport = new TCustomServerSocket(new 
InetSocketAddress(listenAddr, listenPort),
-                                                              
DatabaseDescriptor.getRpcKeepAlive(),
-                                                              
DatabaseDescriptor.getRpcSendBufferSize(),
-                                                              
DatabaseDescriptor.getRpcRecvBufferSize());
-                }
-                catch (TTransportException e)
-                {
-                    throw new RuntimeException(String.format("Unable to create 
thrift socket to %s:%s", listenAddr, listenPort), e);
-                }
-                // ThreadPool Server and will be invocation per connection 
basis...
-                TThreadPoolServer.Args serverArgs = new 
TThreadPoolServer.Args(serverTransport)
-                                                                         
.minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
-                                                                         
.maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
-                                                                         
.inputTransportFactory(inTransportFactory)
-                                                                         
.outputTransportFactory(outTransportFactory)
-                                                                         
.inputProtocolFactory(tProtocolFactory)
-                                                                         
.outputProtocolFactory(tProtocolFactory)
-                                                                         
.processor(processor);
-                ExecutorService executorService = new 
CleaningThreadPool(cassandraServer.clientState, serverArgs.minWorkerThreads, 
serverArgs.maxWorkerThreads);
-                serverEngine = new CustomTThreadPoolServer(serverArgs, 
executorService);
-                logger.info(String.format("Using synchronous/threadpool thrift 
server on %s : %s", listenAddr, listenPort));
-            }
-            else
-            {
-                TNonblockingServerTransport serverTransport;
-                try
-                {
-                    serverTransport = new TCustomNonblockingServerSocket(new 
InetSocketAddress(listenAddr, listenPort),
-                                                                             
DatabaseDescriptor.getRpcKeepAlive(),
-                                                                             
DatabaseDescriptor.getRpcSendBufferSize(),
-                                                                             
DatabaseDescriptor.getRpcRecvBufferSize());
-                }
-                catch (TTransportException e)
-                {
-                    throw new RuntimeException(String.format("Unable to create 
thrift socket to %s:%s", listenAddr, listenPort), e);
-                }
-
-                if 
(DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(ASYNC))
-                {
-                    // This is single threaded hence the invocation will be all
-                    // in one thread.
-                    TNonblockingServer.Args serverArgs = new 
TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
-                                                                               
                      .outputTransportFactory(outTransportFactory)
-                                                                               
                      .inputProtocolFactory(tProtocolFactory)
-                                                                               
                      .outputProtocolFactory(tProtocolFactory)
-                                                                               
                      .processor(processor);
-                    logger.info(String.format("Using non-blocking/asynchronous 
thrift server on %s : %s", listenAddr, listenPort));
-                    serverEngine = new CustomTNonBlockingServer(serverArgs);
-                }
-                else if 
(DatabaseDescriptor.getRpcServerType().equalsIgnoreCase(HSHA))
-                {
-                    // This is NIO selector service but the invocation will be 
Multi-Threaded with the Executor service.
-                    ExecutorService executorService = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
-                                                                               
        DatabaseDescriptor.getRpcMaxThreads(),
-                                                                               
        60L,
-                                                                               
        TimeUnit.SECONDS,
-                                                                               
        new SynchronousQueue<Runnable>(),
-                                                                               
        new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
-                    TNonblockingServer.Args serverArgs = new 
TNonblockingServer.Args(serverTransport).inputTransportFactory(inTransportFactory)
-                                                                               
        .outputTransportFactory(outTransportFactory)
-                                                                               
        .inputProtocolFactory(tProtocolFactory)
-                                                                               
        .outputProtocolFactory(tProtocolFactory)
-                                                                               
        .processor(processor);
-                    logger.info(String.format("Using custom 
half-sync/half-async thrift server on %s : %s", listenAddr, listenPort));
-                    // Check for available processors in the system which will 
be equal to the IO Threads.
-                    serverEngine = new CustomTHsHaServer(serverArgs, 
executorService, Runtime.getRuntime().availableProcessors());
-                }
-            }
+            logger.info("Using TFramedTransport with a max frame size of {} 
bytes.", tFramedTransportSize);
+            args.inTransportFactory = new 
TFramedTransport.Factory(tFramedTransportSize);
+            args.outTransportFactory = new 
TFramedTransport.Factory(tFramedTransportSize);
+            serverEngine = new 
TServerCustomFactory(DatabaseDescriptor.getRpcServerType()).buildTServer(args);
         }
 
         public void run()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java 
b/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
index 350a13d..6ade5ca 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTHsHaServer.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.thrift;
 
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.SelectorProvider;
@@ -30,9 +31,15 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.service.SocketSessionManagementService;
 import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
 import org.apache.thrift.transport.TNonblockingServerTransport;
 import org.apache.thrift.transport.TNonblockingSocket;
 import org.apache.thrift.transport.TNonblockingTransport;
@@ -343,4 +350,36 @@ public class CustomTHsHaServer extends TNonblockingServer
         // thread because the method is not synchronized with the rest of the
         // selectors threads.
     }
+
+    public static class Factory implements TServerFactory
+    {
+        public TServer buildTServer(Args args)
+        {
+            final InetSocketAddress addr = args.addr;
+            TNonblockingServerTransport serverTransport;
+            try
+            {
+                serverTransport = new TCustomNonblockingServerSocket(addr, 
args.keepAlive, args.sendBufferSize, args.recvBufferSize);
+            }
+            catch (TTransportException e)
+            {
+                throw new RuntimeException(String.format("Unable to create 
thrift socket to %s:%s", addr.getAddress(), addr.getPort()), e);
+            }
+
+            // This is NIO selector service but the invocation will be 
Multi-Threaded with the Executor service.
+            ExecutorService executorService = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getRpcMinThreads(),
+                                                                               
DatabaseDescriptor.getRpcMaxThreads(),
+                                                                               
60L,
+                                                                               
TimeUnit.SECONDS,
+                                                                               
new SynchronousQueue<Runnable>(),
+                                                                               
new NamedThreadFactory("RPC-Thread"), "RPC-THREAD-POOL");
+            TNonblockingServer.Args serverArgs = new 
TNonblockingServer.Args(serverTransport).inputTransportFactory(args.inTransportFactory)
+                                                                               
.outputTransportFactory(args.outTransportFactory)
+                                                                               
.inputProtocolFactory(args.tProtocolFactory)
+                                                                               
.outputProtocolFactory(args.tProtocolFactory)
+                                                                               
.processor(args.processor);
+            // Check for available processors in the system which will be 
equal to the IO Threads.
+            return new CustomTHsHaServer(serverArgs, executorService, 
Runtime.getRuntime().availableProcessors());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java 
b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
index 0b6c90b..479fba8 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTNonBlockingServer.java
@@ -21,9 +21,14 @@ package org.apache.cassandra.thrift;
  */
 
 
+import java.net.InetSocketAddress;
+
 import org.apache.cassandra.service.SocketSessionManagementService;
 import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
 import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TTransportException;
 
 public class CustomTNonBlockingServer extends TNonblockingServer
 {
@@ -40,4 +45,30 @@ public class CustomTNonBlockingServer extends 
TNonblockingServer
         frameBuffer.invoke();
         return true;
     }
+
+    public static class Factory implements TServerFactory
+    {
+        public TServer buildTServer(Args args)
+        {
+            final InetSocketAddress addr = args.addr;
+            TNonblockingServerTransport serverTransport;
+            try
+            {
+                serverTransport = new TCustomNonblockingServerSocket(addr, 
args.keepAlive, args.sendBufferSize, args.recvBufferSize);
+            }
+            catch (TTransportException e)
+            {
+                throw new RuntimeException(String.format("Unable to create 
thrift socket to %s:%s", addr.getAddress(), addr.getPort()), e);
+            }
+
+            // This is single threaded hence the invocation will be all
+            // in one thread.
+            TNonblockingServer.Args serverArgs = new 
TNonblockingServer.Args(serverTransport).inputTransportFactory(args.inTransportFactory)
+                                                                               
              .outputTransportFactory(args.outTransportFactory)
+                                                                               
              .inputProtocolFactory(args.tProtocolFactory)
+                                                                               
              .outputProtocolFactory(args.tProtocolFactory)
+                                                                               
              .processor(args.processor);
+            return new CustomTNonBlockingServer(serverArgs);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java 
b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
index d6ba012..fc07c60 100644
--- a/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
+++ b/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
@@ -19,6 +19,7 @@
 
 package org.apache.cassandra.thrift;
 
+import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -26,11 +27,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.AbstractCassandraDaemon;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
@@ -219,4 +223,32 @@ public class CustomTThreadPoolServer extends TServer
             }
         }
     }
+
+    public static class Factory implements TServerFactory
+    {
+        public TServer buildTServer(Args args)
+        {
+            final InetSocketAddress addr = args.addr;
+            TServerTransport serverTransport;
+            try
+            {
+                serverTransport = new TCustomServerSocket(addr, 
args.keepAlive, args.sendBufferSize, args.recvBufferSize);
+            }
+            catch (TTransportException e)
+            {
+                throw new RuntimeException(String.format("Unable to create 
thrift socket to %s:%s", addr.getAddress(), addr.getPort()), e);
+            }
+            // ThreadPool Server and will be invocation per connection basis...
+            TThreadPoolServer.Args serverArgs = new 
TThreadPoolServer.Args(serverTransport)
+                                                                     
.minWorkerThreads(DatabaseDescriptor.getRpcMinThreads())
+                                                                     
.maxWorkerThreads(DatabaseDescriptor.getRpcMaxThreads())
+                                                                     
.inputTransportFactory(args.inTransportFactory)
+                                                                     
.outputTransportFactory(args.outTransportFactory)
+                                                                     
.inputProtocolFactory(args.tProtocolFactory)
+                                                                     
.outputProtocolFactory(args.tProtocolFactory)
+                                                                     
.processor(args.processor);
+            ExecutorService executorService = new 
AbstractCassandraDaemon.CleaningThreadPool(args.cassandraServer.clientState, 
serverArgs.minWorkerThreads, serverArgs.maxWorkerThreads);
+            return new CustomTThreadPoolServer(serverArgs, executorService);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java 
b/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
new file mode 100644
index 0000000..50e4fac
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/TServerCustomFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.thrift;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.thrift.server.TServer;
+
+/**
+ * Helper implementation to create a thrift TServer based on one of the common 
types we support (sync, async, hsha),
+ * or a custom type by setting the fully qualified java class name in the 
rpc_server_type setting.
+ */
+public class TServerCustomFactory implements TServerFactory
+{
+    private static Logger logger = 
LoggerFactory.getLogger(TServerCustomFactory.class);
+    private final String serverType;
+
+    public TServerCustomFactory(String serverType)
+    {
+        assert serverType != null;
+        this.serverType = serverType;
+    }
+
+    public TServer buildTServer(TServerFactory.Args args)
+    {
+        TServer server;
+        if (CassandraDaemon.SYNC.equalsIgnoreCase(serverType))
+        {
+            server = new CustomTThreadPoolServer.Factory().buildTServer(args);
+            logger.info(String.format("Using synchronous/threadpool thrift 
server on %s : %s", args.addr.getHostName(), args.addr.getPort()));
+        }
+        else if(CassandraDaemon.ASYNC.equalsIgnoreCase(serverType))
+        {
+            server = new CustomTNonBlockingServer.Factory().buildTServer(args);
+            logger.info(String.format("Using non-blocking/asynchronous thrift 
server on %s : %s", args.addr.getHostName(), args.addr.getPort()));
+        }
+        else if(CassandraDaemon.HSHA.equalsIgnoreCase(serverType))
+        {
+            server = new CustomTHsHaServer.Factory().buildTServer(args);
+            logger.info(String.format("Using custom half-sync/half-async 
thrift server on %s : %s", args.addr.getHostName(), args.addr.getPort()));
+        }
+        else
+        {
+            TServerFactory serverFactory;
+            try
+            {
+                serverFactory = (TServerFactory) 
Class.forName(serverType).newInstance();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException("Failed to instantiate server 
factory:" + serverType, e);
+            }
+            server = serverFactory.buildTServer(args);
+            logger.info(String.format("Using custom thrift server %s on %s : 
%s", server.getClass().getName(), args.addr.getHostName(), 
args.addr.getPort()));
+        }
+        return server;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8264eb21/src/java/org/apache/cassandra/thrift/TServerFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/TServerFactory.java 
b/src/java/org/apache/cassandra/thrift/TServerFactory.java
new file mode 100644
index 0000000..0c93867
--- /dev/null
+++ b/src/java/org/apache/cassandra/thrift/TServerFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.thrift;
+
+import java.net.InetSocketAddress;
+
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.transport.TTransportFactory;
+
+public interface TServerFactory
+{
+    TServer buildTServer(Args args);
+
+    public static class Args
+    {
+        public InetSocketAddress addr;
+        public CassandraServer cassandraServer;
+        public Cassandra.Processor processor;
+        public TProtocolFactory tProtocolFactory;
+        public TTransportFactory inTransportFactory;
+        public TTransportFactory outTransportFactory;
+        public Integer sendBufferSize;
+        public Integer recvBufferSize;
+        public boolean keepAlive;
+    }
+}

Reply via email to