Author: aching
Date: Tue Aug 7 00:08:15 2012
New Revision: 1370071
URL: http://svn.apache.org/viewvc?rev=1370071&view=rev
Log:
GIRAPH-281: Add options to control Netty's per-channel receive and
send buffer sizes (ekoontz via aching).
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1370071&r1=1370070&r2=1370071&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug 7 00:08:15 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-281: Add options to control Netty's per-channel receive and
+ send buffer sizes (ekoontz via aching).
+
GIRAPH-228: SimpleTriangleClosingVertex should not use ArrayWritable
for a vertex value. (Eli Reisman via jghoman)
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1370071&r1=1370070&r2=1370071&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Tue Aug
7 00:08:15 2012
@@ -27,12 +27,15 @@ import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelConfig;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline;
@@ -69,6 +72,11 @@ public class NettyClient<I extends Writa
private final Map<InetSocketAddress, Channel> addressChannelMap =
new HashMap<InetSocketAddress, Channel>();
+ /** Send buffer size */
+ private final int sendBufferSize;
+ /** Receive buffer size */
+ private final int receiveBufferSize;
+
/**
* Only constructor
*
@@ -76,6 +84,12 @@ public class NettyClient<I extends Writa
*/
public NettyClient(Mapper<?, ?, ?, ?>.Context context) {
this.context = context;
+ Configuration conf = context.getConfiguration();
+ sendBufferSize = conf.getInt(GiraphJob.CLIENT_SEND_BUFFER_SIZE,
+ GiraphJob.DEFAULT_CLIENT_SEND_BUFFER_SIZE);
+ receiveBufferSize = conf.getInt(GiraphJob.CLIENT_RECEIVE_BUFFER_SIZE,
+ GiraphJob.DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE);
+
// Configure the client.
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
@@ -109,6 +123,10 @@ public class NettyClient<I extends Writa
ChannelFuture connectionFuture = bootstrap.connect(address);
connectionFuture.getChannel().getConfig().setOption("tcpNoDelay", true);
connectionFuture.getChannel().getConfig().setOption("keepAlive", true);
+ connectionFuture.getChannel().getConfig().setOption("sendBufferSize",
+ sendBufferSize);
+ connectionFuture.getChannel().getConfig().setOption("receiveBufferSize",
+ receiveBufferSize);
addressChannelMap.put(address, connectionFuture.getChannel());
waitingConnectionList.add(connectionFuture);
@@ -191,4 +209,15 @@ public class NettyClient<I extends Writa
}
}
}
+
+ /**
+ * Returning configuration of the first channel.
+ * @throws ArrayIndexOutOfBoundsException if no
+ * channels exist in the client's address => channel map.
+ * @return ChannelConfig for the first channel (if any).
+ */
+ public ChannelConfig getChannelConfig() {
+ return ((Channel) addressChannelMap.values().toArray()[0]).getConfig();
+ }
+
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java?rev=1370071&r1=1370070&r2=1370071&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java Tue Aug
7 00:08:15 2012
@@ -82,6 +82,10 @@ public class NettyServer<I extends Writa
private final ServerData<I, V, E, M> serverData;
/** Server bootstrap */
private ServerBootstrap bootstrap;
+ /** Send buffer size */
+ private final int sendBufferSize;
+ /** Receive buffer size */
+ private final int receiveBufferSize;
/**
* Constructor for creating the server
@@ -102,6 +106,11 @@ public class NettyServer<I extends Writa
new SendPartitionCurrentMessagesRequest<I, V, E, M>());
requestRegistry.shutdown();
+ sendBufferSize = conf.getInt(GiraphJob.SERVER_SEND_BUFFER_SIZE,
+ GiraphJob.DEFAULT_SERVER_SEND_BUFFER_SIZE);
+ receiveBufferSize = conf.getInt(GiraphJob.SERVER_RECEIVE_BUFFER_SIZE,
+ GiraphJob.DEFAULT_SERVER_RECEIVE_BUFFER_SIZE);
+
ThreadFactory bossFactory = new ThreadFactoryBuilder()
.setNameFormat("Giraph Netty Boss #%d")
.build();
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1370071&r1=1370070&r2=1370071&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Aug
7 00:08:15 2012
@@ -183,6 +183,30 @@ public class GiraphJob {
/** Default maximum number of RPC handlers */
public static final int RPC_NUM_HANDLERS_DEFAULT = 100;
+ /** Client send buffer size */
+ public static final String CLIENT_SEND_BUFFER_SIZE =
+ "giraph.clientSendBufferSize";
+ /** Default client send buffer size of 0.5 MB */
+ public static final int DEFAULT_CLIENT_SEND_BUFFER_SIZE = 512 * 1024;
+
+ /** Client receive buffer size */
+ public static final String CLIENT_RECEIVE_BUFFER_SIZE =
+ "giraph.clientReceiveBufferSize";
+ /** Default client receive buffer size of 32 k */
+ public static final int DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE = 32 * 1024;
+
+ /** Server send buffer size */
+ public static final String SERVER_SEND_BUFFER_SIZE =
+ "giraph.serverSendBufferSize";
+ /** Default server send buffer size of 32 k */
+ public static final int DEFAULT_SERVER_SEND_BUFFER_SIZE = 32 * 1024;
+
+ /** Server receive buffer size */
+ public static final String SERVER_RECEIVE_BUFFER_SIZE =
+ "giraph.serverReceiveBufferSize";
+ /** Default server receive buffer size of 0.5 MB */
+ public static final int DEFAULT_SERVER_RECEIVE_BUFFER_SIZE = 512 * 1024;
+
/**
* Maximum number of vertices per partition before sending.
* (input superstep only).
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1370071&r1=1370070&r2=1370071&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Tue
Aug 7 00:08:15 2012
@@ -19,13 +19,17 @@
package org.apache.giraph.comm;
import org.apache.giraph.comm.messages.SimpleMessageStore;
+import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.utils.MockUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.jboss.netty.channel.socket.DefaultSocketChannelConfig;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -44,10 +48,11 @@ public class ConnectionTest {
*/
@Test
public void connectSingleClientServer() throws IOException {
+ Configuration conf = new Configuration();
@SuppressWarnings("rawtypes")
Context context = mock(Context.class);
+ when(context.getConfiguration()).thenReturn(conf);
- Configuration conf = new Configuration();
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
(SimpleMessageStore.newFactory(
@@ -73,10 +78,10 @@ public class ConnectionTest {
*/
@Test
public void connectOneClientToThreeServers() throws IOException {
+ Configuration conf = new Configuration();
@SuppressWarnings("rawtypes")
Context context = mock(Context.class);
-
- Configuration conf = new Configuration();
+ when(context.getConfiguration()).thenReturn(conf);
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
(SimpleMessageStore.newFactory(
@@ -115,10 +120,11 @@ public class ConnectionTest {
*/
@Test
public void connectThreeClientsToOneServer() throws IOException {
+ Configuration conf = new Configuration();
@SuppressWarnings("rawtypes")
Context context = mock(Context.class);
+ when(context.getConfiguration()).thenReturn(conf);
- Configuration conf = new Configuration();
ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
(SimpleMessageStore.newFactory(
@@ -146,4 +152,50 @@ public class ConnectionTest {
client3.stop();
server.stop();
}
+
+
+ /**
+ * Test that we can use Giraph configuration settings to
+ * modify Netty client channel configuration.
+ * TODO: add test for server-side channel configuration as well.
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testClientChannelConfiguration() throws IOException {
+ Configuration conf = new Configuration();
+ @SuppressWarnings("rawtypes")
+ Context context = mock(Context.class);
+ when(context.getConfiguration()).thenReturn(conf);
+
+ ServerData<IntWritable, IntWritable, IntWritable, IntWritable> serverData =
+ new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+ (SimpleMessageStore.newFactory(
+ MockUtils.mockServiceGetVertexPartitionOwner(1), conf));
+
+ NettyServer<IntWritable, IntWritable, IntWritable, IntWritable> server =
new NettyServer<IntWritable, IntWritable,
+ IntWritable, IntWritable>(conf, serverData);
+ server.start();
+
+ final int giraphClientSendBufferSize =
context.getConfiguration().getInt(GiraphJob.CLIENT_SEND_BUFFER_SIZE,
+ GiraphJob.DEFAULT_CLIENT_SEND_BUFFER_SIZE);
+ final int giraphClientReceiveBufferSize =
context.getConfiguration().getInt(GiraphJob.CLIENT_RECEIVE_BUFFER_SIZE,
+ GiraphJob.DEFAULT_CLIENT_RECEIVE_BUFFER_SIZE);
+
+ NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client =
+ new NettyClient<IntWritable, IntWritable, IntWritable,
+ IntWritable>(context);
+ client.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+
+ DefaultSocketChannelConfig clientConfig =
(DefaultSocketChannelConfig)client.getChannelConfig();
+ final int channelClientSendBufferSize = clientConfig.getSendBufferSize();
+ final int channelClientReceiveBufferSize =
clientConfig.getReceiveBufferSize();
+
+ assertEquals(giraphClientSendBufferSize,channelClientSendBufferSize);
+ assertEquals(giraphClientReceiveBufferSize,channelClientReceiveBufferSize);
+
+ client.stop();
+ server.stop();
+
+ }
}
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1370071&r1=1370070&r2=1370071&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Tue Aug
7 00:08:15 2012
@@ -36,6 +36,7 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -76,9 +77,6 @@ public class RequestTest {
@Before
public void setUp() throws IOException {
- @SuppressWarnings("rawtypes")
- Context context = mock(Context.class);
-
// Setup the conf
conf = new Configuration();
conf.setClass(GiraphJob.VERTEX_CLASS, TestVertex.class, Vertex.class);
@@ -91,6 +89,10 @@ public class RequestTest {
conf.setClass(GiraphJob.MESSAGE_VALUE_CLASS,
IntWritable.class, Writable.class);
+ @SuppressWarnings("rawtypes")
+ Context context = mock(Context.class);
+ when(context.getConfiguration()).thenReturn(conf);
+
// Start the service
serverData =
new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>