Author: hairong Date: Thu Nov 25 04:36:17 2010 New Revision: 1038918 URL: http://svn.apache.org/viewvc?rev=1038918&view=rev Log: HADOOP-6764. Add number of reader threads and queue length as configuration parameters in RPC.getServer. Contributed by Dmytro Molkov.
Modified: hadoop/common/trunk/CHANGES.txt hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java Modified: hadoop/common/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1038918&r1=1038917&r2=1038918&view=diff ============================================================================== --- hadoop/common/trunk/CHANGES.txt (original) +++ hadoop/common/trunk/CHANGES.txt Thu Nov 25 04:36:17 2010 @@ -15,7 +15,10 @@ Trunk (unreleased changes) improve other messaging. (nigel) HADOOP-7001. Configuration changes can occur via the Reconfigurable - interface. (Patrick Kline via dhruba) + interface. (Patrick Kling via dhruba) + + HADOOP-6764. Add number of reader threads and queue length as + configuration parameters in RPC.getServer. (Dmytro Molkov via hairong) OPTIMIZATIONS Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1038918&r1=1038917&r2=1038918&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Thu Nov 25 04:36:17 2010 @@ -211,14 +211,15 @@ public class AvroRpcEngine implements Rp /** Construct a server for a protocol implementation instance listening on a * port and address. */ public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress, - int port, int numHandlers, boolean verbose, + int port, int numHandlers, int numReaders, + int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager ) throws IOException { return ENGINE.getServer(TunnelProtocol.class, new TunnelResponder(iface, impl), - bindAddress, port, numHandlers, verbose, conf, - secretManager); + bindAddress, port, numHandlers, numReaders, + queueSizePerHandler, verbose, conf, secretManager); } } Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=1038918&r1=1038917&r2=1038918&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java Thu Nov 25 04:36:17 2010 @@ -380,18 +380,33 @@ public class RPC { throws IOException { return getProtocolEngine(protocol, conf) - .getServer(protocol, instance, bindAddress, port, numHandlers, verbose, - conf, secretManager); + .getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1, + verbose, conf, secretManager); + } + + /** Construct a server for a protocol implementation instance. */ + public static Server getServer(Class<?> protocol, + Object instance, String bindAddress, int port, + int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, Configuration conf, + SecretManager<? extends TokenIdentifier> secretManager) + throws IOException { + + return getProtocolEngine(protocol, conf) + .getServer(protocol, instance, bindAddress, port, numHandlers, + numReaders, queueSizePerHandler, verbose, conf, secretManager); } /** An RPC Server. */ public abstract static class Server extends org.apache.hadoop.ipc.Server { protected Server(String bindAddress, int port, - Class<? extends Writable> paramClass, int handlerCount, + Class<? extends Writable> paramClass, int handlerCount, + int numReaders, int queueSizePerHandler, Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { - super(bindAddress, port, paramClass, handlerCount, conf, serverName, secretManager); + super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler, + conf, serverName, secretManager); } } Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java?rev=1038918&r1=1038917&r2=1038918&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java Thu Nov 25 04:36:17 2010 @@ -50,7 +50,8 @@ public interface RpcEngine { /** Construct a server for a protocol implementation instance. */ RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress, - int port, int numHandlers, boolean verbose, + int port, int numHandlers, int numReaders, + int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager ) throws IOException; Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=1038918&r1=1038917&r2=1038918&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java Thu Nov 25 04:36:17 2010 @@ -1451,16 +1451,18 @@ public abstract class Server { Configuration conf) throws IOException { - this(bindAddress, port, paramClass, handlerCount, conf, Integer.toString(port), null); + this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null); } + /** Constructs a server listening on the named port and address. Parameters passed must * be of the named class. The <code>handlerCount</handlerCount> determines * the number of handler threads that will be used to process calls. - * + * If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters + * from configuration. Otherwise the configuration will be picked up. */ @SuppressWarnings("unchecked") protected Server(String bindAddress, int port, - Class<? extends Writable> paramClass, int handlerCount, + Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler, Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { this.bindAddress = bindAddress; @@ -1469,15 +1471,23 @@ public abstract class Server { this.paramClass = paramClass; this.handlerCount = handlerCount; this.socketSendBufferSize = 0; - this.maxQueueSize = handlerCount * conf.getInt( - CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, - CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); + if (queueSizePerHandler != -1) { + this.maxQueueSize = queueSizePerHandler; + } else { + this.maxQueueSize = handlerCount * conf.getInt( + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); + } this.maxRespSize = conf.getInt( CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT); - this.readThreads = conf.getInt( - CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, - CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); + if (numReaders != -1) { + this.readThreads = numReaders; + } else { + this.readThreads = conf.getInt( + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); + } this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize); this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10); @@ -1691,7 +1701,22 @@ public abstract class Server { return callQueue.size(); } - + /** + * The maximum size of the rpc call queue of this server. + * @return The maximum size of the rpc call queue. + */ + public int getMaxQueueSize() { + return maxQueueSize; + } + + /** + * The number of reader threads for this server. + * @return The number of reader threads. + */ + public int getNumReaders() { + return readThreads; + } + /** * When the read or write buffer size is larger than this limit, i/o will be * done in chunks of this size. Most RPC requests and responses would be Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1038918&r1=1038917&r2=1038918&view=diff ============================================================================== --- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java (original) +++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java Thu Nov 25 04:36:17 2010 @@ -285,11 +285,12 @@ public class WritableRpcEngine implement * port and address. */ public Server getServer(Class<?> protocol, Object instance, String bindAddress, int port, - int numHandlers, boolean verbose, Configuration conf, + int numHandlers, int numReaders, int queueSizePerHandler, + boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { return new Server(instance, conf, bindAddress, port, numHandlers, - verbose, secretManager); + numReaders, queueSizePerHandler, verbose, secretManager); } /** An RPC Server. */ @@ -305,7 +306,7 @@ public class WritableRpcEngine implement */ public Server(Object instance, Configuration conf, String bindAddress, int port) throws IOException { - this(instance, conf, bindAddress, port, 1, false, null); + this(instance, conf, bindAddress, port, 1, -1, -1, false, null); } private static String classNameBase(String className) { @@ -325,10 +326,11 @@ public class WritableRpcEngine implement * @param verbose whether each call should be logged */ public Server(Object instance, Configuration conf, String bindAddress, int port, - int numHandlers, boolean verbose, + int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { - super(bindAddress, port, Invocation.class, numHandlers, conf, + super(bindAddress, port, Invocation.class, numHandlers, numReaders, + queueSizePerHandler, conf, classNameBase(instance.getClass().getName()), secretManager); this.instance = instance; this.verbose = verbose; Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1038918&r1=1038917&r2=1038918&view=diff ============================================================================== --- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java (original) +++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java Thu Nov 25 04:36:17 2010 @@ -190,6 +190,28 @@ public class TestRPC extends TestCase { } } } + + public void testConfRpc() throws Exception { + Server server = RPC.getServer(TestProtocol.class, + new TestImpl(), ADDRESS, 0, 1, false, conf, null); + // Just one handler + int confQ = conf.getInt( + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); + assertEquals(confQ, server.getMaxQueueSize()); + + int confReaders = conf.getInt( + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); + assertEquals(confReaders, server.getNumReaders()); + server.stop(); + + server = RPC.getServer(TestProtocol.class, + new TestImpl(), ADDRESS, 0, 1, 3, 200, false, conf, null); + assertEquals(3, server.getNumReaders()); + assertEquals(200, server.getMaxQueueSize()); + server.stop(); + } public void testSlowRpc() throws Exception { System.out.println("Testing Slow RPC"); @@ -234,6 +256,10 @@ public class TestRPC extends TestCase { System.out.println("Down slow rpc testing"); } } + + public void testRPCConf(Configuration conf) throws Exception { + + } public void testCalls(Configuration conf) throws Exception { Server server = RPC.getServer(TestProtocol.class,