Author: aching
Date: Tue Aug  7 00:08:15 2012
New Revision: 1370071

URL: http://svn.apache.org/viewvc?rev=1370071&view=rev
Log:
GIRAPH-281: Add options to control Netty's per-channel receive and
send buffer sizes (ekoontz via aching).


Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java

Modified: giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1370071&r1=1370070&r2=1370071&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug  7 00:08:15 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-281: Add options to control Netty's per-channel receive and
+  send buffer sizes (ekoontz via aching).
+
   GIRAPH-228: SimpleTriangleClosingVertex should not use ArrayWritable
   for a vertex value. (Eli Reisman via jghoman)
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1370071&r1=1370070&r2=1370071&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Tue Aug  
7 00:08:15 2012
@@ -27,12 +27,15 @@ import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.Logger;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelConfig;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.ChannelPipeline;
@@ -69,6 +72,11 @@ public class NettyClient<I extends Writa
   private final Map<InetSocketAddress, Channel> addressChannelMap =
       new HashMap<InetSocketAddress, Channel>();
 
+  /** Send buffer size */
+  private final int sendBufferSize;
+  /** Receive buffer size */
+  private final int receiveBufferSize;
+
   /**
    * Only constructor
    *
@@ -76,6 +84,12 @@ public class NettyClient<I extends Writa
    */
   public NettyClient(Mapper<?, ?, ?, ?>.Context context) {
     this.context = context;
+    Configuration conf = context.getConfiguration();
+    sendBufferSize = conf.getInt(GiraphJob.CLIENT_SEND_BUFFER_SIZE,
+        GiraphJob.DEFAULT_CLIENT_SEND_BUFFER_SIZE);
+    receiveBufferSize = conf.getInt(GiraphJob.CLIENT_RECEIVE_BUFFER_SIZE,
+        GiraphJob.DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE);
+
     // Configure the client.
     bootstrap = new ClientBootstrap(
         new NioClientSocketChannelFactory(
@@ -109,6 +123,10 @@ public class NettyClient<I extends Writa
       ChannelFuture connectionFuture = bootstrap.connect(address);
       connectionFuture.getChannel().getConfig().setOption("tcpNoDelay", true);
       connectionFuture.getChannel().getConfig().setOption("keepAlive", true);
+      connectionFuture.getChannel().getConfig().setOption("sendBufferSize",
+          sendBufferSize);
+      connectionFuture.getChannel().getConfig().setOption("receiveBufferSize",
+          receiveBufferSize);
       addressChannelMap.put(address, connectionFuture.getChannel());
 
       waitingConnectionList.add(connectionFuture);
@@ -191,4 +209,15 @@ public class NettyClient<I extends Writa
       }
     }
   }
+
+  /**
+   * Returning configuration of the first channel.
+   * @throws ArrayIndexOutOfBoundsException if no
+   *   channels exist in the client's address => channel map.
+   * @return ChannelConfig for the first channel (if any).
+   */
+  public ChannelConfig getChannelConfig() {
+    return ((Channel) addressChannelMap.values().toArray()[0]).getConfig();
+  }
+
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java?rev=1370071&r1=1370070&r2=1370071&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java 
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java Tue Aug  
7 00:08:15 2012
@@ -82,6 +82,10 @@ public class NettyServer<I extends Writa
   private final ServerData<I, V, E, M> serverData;
   /** Server bootstrap */
   private ServerBootstrap bootstrap;
+  /** Send buffer size */
+  private final int sendBufferSize;
+  /** Receive buffer size */
+  private final int receiveBufferSize;
 
   /**
    * Constructor for creating the server
@@ -102,6 +106,11 @@ public class NettyServer<I extends Writa
         new SendPartitionCurrentMessagesRequest<I, V, E, M>());
     requestRegistry.shutdown();
 
+    sendBufferSize = conf.getInt(GiraphJob.SERVER_SEND_BUFFER_SIZE,
+        GiraphJob.DEFAULT_SERVER_SEND_BUFFER_SIZE);
+    receiveBufferSize = conf.getInt(GiraphJob.SERVER_RECEIVE_BUFFER_SIZE,
+        GiraphJob.DEFAULT_SERVER_RECEIVE_BUFFER_SIZE);
+
     ThreadFactory bossFactory = new ThreadFactoryBuilder()
       .setNameFormat("Giraph Netty Boss #%d")
       .build();

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1370071&r1=1370070&r2=1370071&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Aug  
7 00:08:15 2012
@@ -183,6 +183,30 @@ public class GiraphJob {
   /** Default maximum number of RPC handlers */
   public static final int RPC_NUM_HANDLERS_DEFAULT = 100;
 
+  /** Client send buffer size */
+  public static final String CLIENT_SEND_BUFFER_SIZE =
+      "giraph.clientSendBufferSize";
+  /** Default client send buffer size of 0.5 MB */
+  public static final int DEFAULT_CLIENT_SEND_BUFFER_SIZE = 512 * 1024;
+
+  /** Client receive buffer size */
+  public static final String CLIENT_RECEIVE_BUFFER_SIZE =
+      "giraph.clientReceiveBufferSize";
+  /** Default client receive buffer size of 32 k */
+  public static final int DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE = 32 * 1024;
+
+  /** Server send buffer size */
+  public static final String SERVER_SEND_BUFFER_SIZE =
+      "giraph.serverSendBufferSize";
+  /** Default server send buffer size of 32 k */
+  public static final int DEFAULT_SERVER_SEND_BUFFER_SIZE = 32 * 1024;
+
+  /** Server receive buffer size */
+  public static final String SERVER_RECEIVE_BUFFER_SIZE =
+      "giraph.serverReceiveBufferSize";
+  /** Default server receive buffer size of 0.5 MB */
+  public static final int DEFAULT_SERVER_RECEIVE_BUFFER_SIZE = 512 * 1024;
+
   /**
    *  Maximum number of vertices per partition before sending.
    *  (input superstep only).

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1370071&r1=1370070&r2=1370071&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java 
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Tue 
Aug  7 00:08:15 2012
@@ -19,13 +19,17 @@
 package org.apache.giraph.comm;
 
 import org.apache.giraph.comm.messages.SimpleMessageStore;
+import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.utils.MockUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.jboss.netty.channel.socket.DefaultSocketChannelConfig;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -44,10 +48,11 @@ public class ConnectionTest {
    */
   @Test
   public void connectSingleClientServer() throws IOException {
+    Configuration conf = new Configuration();
     @SuppressWarnings("rawtypes")
     Context context = mock(Context.class);
+    when(context.getConfiguration()).thenReturn(conf);
 
-    Configuration conf = new Configuration();
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
         new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
             (SimpleMessageStore.newFactory(
@@ -73,10 +78,10 @@ public class ConnectionTest {
    */
   @Test
   public void connectOneClientToThreeServers() throws IOException {
+    Configuration conf = new Configuration();
     @SuppressWarnings("rawtypes")
     Context context = mock(Context.class);
-
-    Configuration conf = new Configuration();
+    when(context.getConfiguration()).thenReturn(conf);
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
         new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
             (SimpleMessageStore.newFactory(
@@ -115,10 +120,11 @@ public class ConnectionTest {
    */
   @Test
   public void connectThreeClientsToOneServer() throws IOException {
+    Configuration conf = new Configuration();
     @SuppressWarnings("rawtypes")
     Context context = mock(Context.class);
+    when(context.getConfiguration()).thenReturn(conf);
 
-    Configuration conf = new Configuration();
     ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
         new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
             (SimpleMessageStore.newFactory(
@@ -146,4 +152,50 @@ public class ConnectionTest {
     client3.stop();
     server.stop();
   }
+
+
+  /**
+   * Test that we can use Giraph configuration settings to
+   * modify Netty client channel configuration.
+   * TODO: add test for server-side channel configuration as well.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testClientChannelConfiguration() throws IOException {
+    Configuration conf = new Configuration();
+    @SuppressWarnings("rawtypes")
+    Context context = mock(Context.class);
+    when(context.getConfiguration()).thenReturn(conf);
+
+    ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+        new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+            (SimpleMessageStore.newFactory(
+                MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+
+    NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server = 
new NettyServer<IntWritable, IntWritable,
+        IntWritable, IntWritable>(conf, serverData);
+    server.start();
+
+    final int giraphClientSendBufferSize = 
context.getConfiguration().getInt(GiraphJob.CLIENT_SEND_BUFFER_SIZE,
+        GiraphJob.DEFAULT_CLIENT_SEND_BUFFER_SIZE);
+    final int giraphClientReceiveBufferSize = 
context.getConfiguration().getInt(GiraphJob.CLIENT_RECEIVE_BUFFER_SIZE,
+        GiraphJob.DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE);
+
+    NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client =
+        new NettyClient<IntWritable, IntWritable, IntWritable,
+            IntWritable>(context);
+    client.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+
+    DefaultSocketChannelConfig clientConfig = 
(DefaultSocketChannelConfig)client.getChannelConfig();
+    final int channelClientSendBufferSize = clientConfig.getSendBufferSize();
+    final int channelClientReceiveBufferSize = 
clientConfig.getReceiveBufferSize();
+
+    assertEquals(giraphClientSendBufferSize,channelClientSendBufferSize);
+    assertEquals(giraphClientReceiveBufferSize,channelClientReceiveBufferSize);
+
+    client.stop();
+    server.stop();
+
+  }
 }

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: 
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1370071&r1=1370070&r2=1370071&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java 
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Tue Aug  
7 00:08:15 2012
@@ -36,6 +36,7 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -76,9 +77,6 @@ public class RequestTest {
 
   @Before
   public void setUp() throws IOException {
-    @SuppressWarnings("rawtypes")
-    Context context = mock(Context.class);
-
     // Setup the conf
     conf = new Configuration();
     conf.setClass(GiraphJob.VERTEX_CLASS, TestVertex.class, Vertex.class);
@@ -91,6 +89,10 @@ public class RequestTest {
     conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS,
         IntWritable.class, Writable.class);
 
+    @SuppressWarnings("rawtypes")
+    Context context = mock(Context.class);
+    when(context.getConfiguration()).thenReturn(conf);
+
     // Start the service
     serverData =
         new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>


Reply via email to