Author: hairong
Date: Thu Nov 25 04:36:17 2010
New Revision: 1038918

URL: http://svn.apache.org/viewvc?rev=1038918&view=rev
Log:
HADOOP-6764. Add number of reader threads and queue length as configuration 
parameters in RPC.getServer. Contributed by Dmytro Molkov.

Modified:
    hadoop/common/trunk/CHANGES.txt
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
    hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/common/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Thu Nov 25 04:36:17 2010
@@ -15,7 +15,10 @@ Trunk (unreleased changes)
     improve other messaging. (nigel)
 
     HADOOP-7001.  Configuration changes can occur via the Reconfigurable
-    interface. (Patrick Kline via dhruba)
+    interface. (Patrick Kling via dhruba)
+
+    HADOOP-6764. Add number of reader threads and queue length as
+    configuration parameters in RPC.getServer. (Dmytro Molkov via hairong)
 
   OPTIMIZATIONS
 

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java 
(original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/AvroRpcEngine.java Thu 
Nov 25 04:36:17 2010
@@ -211,14 +211,15 @@ public class AvroRpcEngine implements Rp
   /** Construct a server for a protocol implementation instance listening on a
    * port and address. */
   public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress,
-                              int port, int numHandlers, boolean verbose,
+                              int port, int numHandlers, int numReaders,
+                              int queueSizePerHandler, boolean verbose,
                               Configuration conf, 
                        SecretManager<? extends TokenIdentifier> secretManager
                               ) throws IOException {
     return ENGINE.getServer(TunnelProtocol.class,
                             new TunnelResponder(iface, impl),
-                            bindAddress, port, numHandlers, verbose, conf, 
-                            secretManager);
+                            bindAddress, port, numHandlers, numReaders,
+                            queueSizePerHandler, verbose, conf, secretManager);
   }
 
 }

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RPC.java Thu Nov 25 
04:36:17 2010
@@ -380,18 +380,33 @@ public class RPC {
     throws IOException {
     
     return getProtocolEngine(protocol, conf)
-      .getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
-                 conf, secretManager);
+      .getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1,
+                 verbose, conf, secretManager);
+  }
+
+  /** Construct a server for a protocol implementation instance. */
+  public static Server getServer(Class<?> protocol,
+                                 Object instance, String bindAddress, int port,
+                                 int numHandlers, int numReaders, int 
queueSizePerHandler,
+                                 boolean verbose, Configuration conf,
+                                 SecretManager<? extends TokenIdentifier> 
secretManager) 
+    throws IOException {
+    
+    return getProtocolEngine(protocol, conf)
+      .getServer(protocol, instance, bindAddress, port, numHandlers,
+                 numReaders, queueSizePerHandler, verbose, conf, 
secretManager);
   }
 
   /** An RPC Server. */
   public abstract static class Server extends org.apache.hadoop.ipc.Server {
   
     protected Server(String bindAddress, int port, 
-                     Class<? extends Writable> paramClass, int handlerCount, 
+                     Class<? extends Writable> paramClass, int handlerCount,
+                     int numReaders, int queueSizePerHandler,
                      Configuration conf, String serverName, 
                      SecretManager<? extends TokenIdentifier> secretManager) 
throws IOException {
-      super(bindAddress, port, paramClass, handlerCount, conf, serverName, 
secretManager);
+      super(bindAddress, port, paramClass, handlerCount, numReaders, 
queueSizePerHandler,
+            conf, serverName, secretManager);
     }
   }
 

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/RpcEngine.java Thu Nov 
25 04:36:17 2010
@@ -50,7 +50,8 @@ public interface RpcEngine {
 
   /** Construct a server for a protocol implementation instance. */
   RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
-                       int port, int numHandlers, boolean verbose,
+                       int port, int numHandlers, int numReaders,
+                       int queueSizePerHandler, boolean verbose,
                        Configuration conf, 
                        SecretManager<? extends TokenIdentifier> secretManager
                        ) throws IOException;

Modified: hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/Server.java Thu Nov 25 
04:36:17 2010
@@ -1451,16 +1451,18 @@ public abstract class Server {
                   Configuration conf)
     throws IOException 
   {
-    this(bindAddress, port, paramClass, handlerCount,  conf, 
Integer.toString(port), null);
+    this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, 
Integer.toString(port), null);
   }
+  
   /** Constructs a server listening on the named port and address.  Parameters 
passed must
    * be of the named class.  The <code>handlerCount</handlerCount> determines
    * the number of handler threads that will be used to process calls.
-   * 
+   * If queueSizePerHandler or numReaders are not -1 they will be used instead 
of parameters
+   * from configuration. Otherwise the configuration will be picked up.
    */
   @SuppressWarnings("unchecked")
   protected Server(String bindAddress, int port, 
-                  Class<? extends Writable> paramClass, int handlerCount, 
+                  Class<? extends Writable> paramClass, int handlerCount, int 
numReaders, int queueSizePerHandler,
                   Configuration conf, String serverName, SecretManager<? 
extends TokenIdentifier> secretManager) 
     throws IOException {
     this.bindAddress = bindAddress;
@@ -1469,15 +1471,23 @@ public abstract class Server {
     this.paramClass = paramClass;
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
-    this.maxQueueSize = handlerCount * conf.getInt(
-        CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
-        CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
+    if (queueSizePerHandler != -1) {
+      this.maxQueueSize = queueSizePerHandler;
+    } else {
+      this.maxQueueSize = handlerCount * conf.getInt(
+          CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
+          CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);      
+    }
     this.maxRespSize = conf.getInt(
         CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
         CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
-    this.readThreads = conf.getInt(
-        CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
-        CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
+    if (numReaders != -1) {
+      this.readThreads = numReaders;
+    } else {
+      this.readThreads = conf.getInt(
+          CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
+          CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);      
+    }
     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
     this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 
1000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
@@ -1691,7 +1701,22 @@ public abstract class Server {
     return callQueue.size();
   }
   
-  
+  /**
+   * The maximum size of the rpc call queue of this server.
+   * @return The maximum size of the rpc call queue.
+   */
+  public int getMaxQueueSize() {
+    return maxQueueSize;
+  }
+
+  /**
+   * The number of reader threads for this server.
+   * @return The number of reader threads.
+   */
+  public int getNumReaders() {
+    return readThreads;
+  }
+
   /**
    * When the read or write buffer size is larger than this limit, i/o will be 
    * done in chunks of this size. Most RPC requests and responses would be

Modified: 
hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java 
(original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/ipc/WritableRpcEngine.java 
Thu Nov 25 04:36:17 2010
@@ -285,11 +285,12 @@ public class WritableRpcEngine implement
    * port and address. */
   public Server getServer(Class<?> protocol,
                           Object instance, String bindAddress, int port,
-                          int numHandlers, boolean verbose, Configuration conf,
+                          int numHandlers, int numReaders, int 
queueSizePerHandler,
+                          boolean verbose, Configuration conf,
                       SecretManager<? extends TokenIdentifier> secretManager) 
     throws IOException {
     return new Server(instance, conf, bindAddress, port, numHandlers, 
-        verbose, secretManager);
+        numReaders, queueSizePerHandler, verbose, secretManager);
   }
 
   /** An RPC Server. */
@@ -305,7 +306,7 @@ public class WritableRpcEngine implement
      */
     public Server(Object instance, Configuration conf, String bindAddress, int 
port) 
       throws IOException {
-      this(instance, conf,  bindAddress, port, 1, false, null);
+      this(instance, conf,  bindAddress, port, 1, -1, -1, false, null);
     }
     
     private static String classNameBase(String className) {
@@ -325,10 +326,11 @@ public class WritableRpcEngine implement
      * @param verbose whether each call should be logged
      */
     public Server(Object instance, Configuration conf, String bindAddress,  
int port,
-                  int numHandlers, boolean verbose, 
+                  int numHandlers, int numReaders, int queueSizePerHandler, 
boolean verbose, 
                   SecretManager<? extends TokenIdentifier> secretManager) 
         throws IOException {
-      super(bindAddress, port, Invocation.class, numHandlers, conf, 
+      super(bindAddress, port, Invocation.class, numHandlers, numReaders,
+          queueSizePerHandler, conf,
           classNameBase(instance.getClass().getName()), secretManager);
       this.instance = instance;
       this.verbose = verbose;

Modified: hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1038918&r1=1038917&r2=1038918&view=diff
==============================================================================
--- hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java 
(original)
+++ hadoop/common/trunk/src/test/core/org/apache/hadoop/ipc/TestRPC.java Thu 
Nov 25 04:36:17 2010
@@ -190,6 +190,28 @@ public class TestRPC extends TestCase {
       }
     }
   }
+  
+  public void testConfRpc() throws Exception {
+    Server server = RPC.getServer(TestProtocol.class,
+                                  new TestImpl(), ADDRESS, 0, 1, false, conf, 
null);
+    // Just one handler
+    int confQ = conf.getInt(
+              CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
+              CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
+    assertEquals(confQ, server.getMaxQueueSize());
+
+    int confReaders = conf.getInt(
+              CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
+              CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
+    assertEquals(confReaders, server.getNumReaders());
+    server.stop();
+    
+    server = RPC.getServer(TestProtocol.class,
+                                  new TestImpl(), ADDRESS, 0, 1, 3, 200, 
false, conf, null);
+    assertEquals(3, server.getNumReaders());
+    assertEquals(200, server.getMaxQueueSize());
+    server.stop();    
+  }
 
   public void testSlowRpc() throws Exception {
     System.out.println("Testing Slow RPC");
@@ -234,6 +256,10 @@ public class TestRPC extends TestCase {
       System.out.println("Down slow rpc testing");
     }
   }
+  
+  public void testRPCConf(Configuration conf) throws Exception {
+    
+  }
 
   public void testCalls(Configuration conf) throws Exception {
     Server server = RPC.getServer(TestProtocol.class,


Reply via email to