Author: aching
Date: Tue Aug 7 18:54:42 2012
New Revision: 1370430
URL: http://svn.apache.org/viewvc?rev=1370430&view=rev
Log:
GIRAPH-289: Add thread and channel pooling to NettyClient and
NettyServer. (ekoontz via aching)
Added:
giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug 7 18:54:42 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-289: Add thread and channel pooling to NettyClient and
+ NettyServer. (ekoontz via aching)
+
GIRAPH-276: Fix broken tests in pseudo-distributed mode.
(Alessandro Presta via jghoman)
Added: giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java?rev=1370430&view=auto
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java
(added)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/ChannelRotater.java Tue
Aug 7 18:54:42 2012
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.util.Collection;
+import java.util.List;
+import com.google.common.collect.Lists;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * Maintains multiple channels and rotates between them
+ */
+public class ChannelRotater {
+ /** Index of last used channel */
+ private int index = 0;
+ /** Channel list */
+ private List<Channel> channelList = Lists.newArrayList();
+
+ /**
+ * Add a channel to the rotation
+ *
+ * @param channel Channel to add
+ */
+ public void addChannel(Channel channel) {
+ channelList.add(channel);
+ }
+
+ /**
+ * Get the next channel
+ *
+ * @return Next channel
+ */
+ public Channel nextChannel() {
+ if (channelList.isEmpty()) {
+ throw new IllegalArgumentException("nextChannel: No channels exist!");
+ }
+
+ ++index;
+ if (index >= channelList.size()) {
+ index = 0;
+ }
+ return channelList.get(index);
+ }
+
+ /**
+ * Get the channels
+ *
+ * @return Collection of the channels
+ */
+ Collection<Channel> getChannels() {
+ return channelList;
+ }
+}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java Tue Aug
7 18:54:42 2012
@@ -18,10 +18,10 @@
package org.apache.giraph.comm;
+import com.google.common.collect.Maps;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
@@ -69,9 +69,10 @@ public class NettyClient<I extends Writa
* Map of the peer connections, mapping from remote socket address to client
* meta data
*/
- private final Map<InetSocketAddress, Channel> addressChannelMap =
- new HashMap<InetSocketAddress, Channel>();
-
+ private final Map<InetSocketAddress, ChannelRotater> addressChannelMap =
+ Maps.newHashMap();
+ /** Number of channels per server */
+ private final int channelsPerServer;
/** Send buffer size */
private final int sendBufferSize;
/** Receive buffer size */
@@ -85,6 +86,9 @@ public class NettyClient<I extends Writa
public NettyClient(Mapper<?, ?, ?, ?>.Context context) {
this.context = context;
Configuration conf = context.getConfiguration();
+ this.channelsPerServer = conf.getInt(
+ GiraphJob.CHANNELS_PER_SERVER,
+ GiraphJob.DEFAULT_CHANNELS_PER_SERVER);
sendBufferSize = conf.getInt(GiraphJob.CLIENT_SEND_BUFFER_SIZE,
GiraphJob.DEFAULT_CLIENT_SEND_BUFFER_SIZE);
receiveBufferSize = conf.getInt(GiraphJob.CLIENT_RECEIVE_BUFFER_SIZE,
@@ -94,7 +98,9 @@ public class NettyClient<I extends Writa
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
+ Executors.newCachedThreadPool(),
+ conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+ NettyServer.DEFAULT_MAXIMUM_THREAD_POOL_SIZE)));
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -112,32 +118,52 @@ public class NettyClient<I extends Writa
*
* @param addresses Addresses to connect to (if haven't already connected)
*/
- public void connectAllAdddresses(Collection<InetSocketAddress> addresses) {
+ public void connectAllAddresses(Collection<InetSocketAddress> addresses) {
List<ChannelFuture> waitingConnectionList =
new ArrayList<ChannelFuture>();
for (InetSocketAddress address : addresses) {
+ if (address == null) {
+ throw new IllegalStateException("connectAllAddresses: Null address " +
+ "in addresses " + addresses);
+ }
+
if (addressChannelMap.containsKey(address)) {
continue;
}
- // Start connecting to the remote server
- ChannelFuture connectionFuture = bootstrap.connect(address);
- connectionFuture.getChannel().getConfig().setOption("tcpNoDelay", true);
- connectionFuture.getChannel().getConfig().setOption("keepAlive", true);
- connectionFuture.getChannel().getConfig().setOption("sendBufferSize",
- sendBufferSize);
- connectionFuture.getChannel().getConfig().setOption("receiveBufferSize",
- receiveBufferSize);
- addressChannelMap.put(address, connectionFuture.getChannel());
- waitingConnectionList.add(connectionFuture);
+ // Start connecting to the remote server up to n time
+ ChannelRotater channelRotater = new ChannelRotater();
+ for (int i = 0; i < channelsPerServer; ++i) {
+ ChannelFuture connectionFuture = bootstrap.connect(address);
+ connectionFuture.getChannel().getConfig().setOption("tcpNoDelay",
true);
+ connectionFuture.getChannel().getConfig().setOption("keepAlive", true);
+ connectionFuture.getChannel().getConfig().setOption(
+ "sendBufferSize", sendBufferSize);
+ connectionFuture.getChannel().getConfig().setOption(
+ "receiveBufferSize", receiveBufferSize);
+ channelRotater.addChannel(connectionFuture.getChannel());
+ waitingConnectionList.add(connectionFuture);
+ }
+ addressChannelMap.put(address, channelRotater);
}
// Wait for all the connections to succeed
for (ChannelFuture waitingConnection : waitingConnectionList) {
- waitingConnection.awaitUninterruptibly().getChannel();
+ ChannelFuture future =
+ waitingConnection.awaitUninterruptibly();
+ if (!future.isSuccess()) {
+ throw new IllegalStateException("connectAllAddresses: Future failed " +
+ "with " + future.getCause());
+ }
+ Channel channel = future.getChannel();
if (LOG.isInfoEnabled()) {
- LOG.info("connectAllAaddresses: Connected to " +
- waitingConnection.getChannel().getRemoteAddress());
+ LOG.info("connectAllAddresses: Connected to " +
+ channel.getRemoteAddress());
+ }
+
+ if (channel.getRemoteAddress() == null) {
+ throw new IllegalStateException("connectAllAddresses: Null remote " +
+ "address!");
}
}
}
@@ -149,23 +175,29 @@ public class NettyClient<I extends Writa
// close connections asyncronously, in a Netty-approved
// way, without cleaning up thread pools until all channels
// in addressChannelMap are closed (success or failure)
- final int done = addressChannelMap.size();
+ int channelCount = 0;
+ for (ChannelRotater channelRotater : addressChannelMap.values()) {
+ channelCount += channelRotater.getChannels().size();
+ }
+ final int done = channelCount;
final AtomicInteger count = new AtomicInteger(0);
- for (Channel channel : addressChannelMap.values()) {
- ChannelFuture result = channel.close();
- result.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture cf) {
- if (count.incrementAndGet() == done) {
- if (LOG.isInfoEnabled()) {
- LOG.info("stop: reached wait threshold, " +
- done + " connections closed, releasing " +
- "NettyClient.bootstrap resources now.");
+ for (ChannelRotater channelRotater : addressChannelMap.values()) {
+ for (Channel channel : channelRotater.getChannels()) {
+ ChannelFuture result = channel.close();
+ result.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture cf) {
+ if (count.incrementAndGet() == done) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("stop: reached wait threshold, " +
+ done + " connections closed, releasing " +
+ "NettyClient.bootstrap resources now.");
+ }
+ bootstrap.releaseExternalResources();
}
- bootstrap.releaseExternalResources();
}
- }
- });
+ });
+ }
}
}
@@ -176,9 +208,9 @@ public class NettyClient<I extends Writa
* @param request Request to send
*/
public void sendWritableRequest(InetSocketAddress remoteServer,
- WritableRequest<I, V, E, M> request) {
+ WritableRequest<I, V, E, M> request) {
waitingRequestCount.incrementAndGet();
- Channel channel = addressChannelMap.get(remoteServer);
+ Channel channel = addressChannelMap.get(remoteServer).nextChannel();
if (channel == null) {
throw new IllegalStateException(
"sendWritableRequest: No channel exists for " + remoteServer);
@@ -194,11 +226,6 @@ public class NettyClient<I extends Writa
public void waitAllRequests() {
synchronized (waitingRequestCount) {
while (waitingRequestCount.get() != 0) {
- if (LOG.isInfoEnabled()) {
- LOG.info("waitAllRequests: Waiting interval of " +
- WAITING_REQUEST_MSECS + " msecs and still waiting on " +
- waitingRequestCount + " requests");
- }
try {
waitingRequestCount.wait(WAITING_REQUEST_MSECS);
} catch (InterruptedException e) {
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java Tue Aug
7 18:54:42 2012
@@ -59,7 +59,7 @@ public class NettyServer<I extends Writa
V extends Writable, E extends Writable,
M extends Writable> {
/** Default maximum thread pool size */
- public static final int DEFAULT_MAXIMUM_THREAD_POOL_SIZE = 64;
+ public static final int DEFAULT_MAXIMUM_THREAD_POOL_SIZE = 32;
/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyServer.class);
/** Configuration */
@@ -124,16 +124,12 @@ public class NettyServer<I extends Writa
}
maximumPoolSize = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
DEFAULT_MAXIMUM_THREAD_POOL_SIZE);
- try {
- workerThreadPool =
- (ThreadPoolExecutor) Executors.newCachedThreadPool(workerFactory);
- workerThreadPool.setMaximumPoolSize(maximumPoolSize);
- } catch (ClassCastException e) {
- LOG.warn("Netty worker thread pool is not of type ThreadPoolExecutor",
e);
- }
+ Executors.newCachedThreadPool(workerFactory);
+
channelFactory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(bossFactory),
- workerThreadPool);
+ Executors.newCachedThreadPool(workerFactory),
+ maximumPoolSize);
}
/**
@@ -192,6 +188,9 @@ public class NettyServer<I extends Writa
accepted.add(ch);
tcpNoDelay = ch.getConfig().setOption("tcpNoDelay", true);
keepAlive = ch.getConfig().setOption("keepAlive", true);
+ ch.getConfig().setOption("sendBufferSize", sendBufferSize);
+ ch.getConfig().setOption("receiveBufferSize", receiveBufferSize);
+
break;
} catch (ChannelException e) {
LOG.warn("start: Likely failed to bind on attempt " +
@@ -211,7 +210,8 @@ public class NettyServer<I extends Writa
"communication server: " + myAddress + " with up to " +
maximumPoolSize + " threads on bind attempt " + bindAttempts +
" with tcpNoDelay = " + tcpNoDelay + " and keepAlive = " +
- keepAlive);
+ keepAlive + " sendBufferSize = " + sendBufferSize +
+ " receiveBufferSize = " + receiveBufferSize);
}
}
Modified:
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
(original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
Tue Aug 7 18:54:42 2012
@@ -132,7 +132,7 @@ public class NettyWorkerClient<I extends
}
addresses.add(partitionOwner.getWorkerInfo().getHostnamePort());
}
- nettyClient.connectAllAdddresses(addresses);
+ nettyClient.connectAllAddresses(addresses);
}
/**
@@ -188,10 +188,10 @@ public class NettyWorkerClient<I extends
getInetSocketAddress(partitionOwner.getWorkerInfo(), partitionId);
Map<I, Collection<M>> partitionMessages =
sendMessageCache.removePartitionMessages(partitionId);
- WritableRequest<I, V, E, M> writableReauest =
+ WritableRequest<I, V, E, M> writableRequest =
new SendPartitionMessagesRequest<I, V, E, M>(
partitionId, partitionMessages);
- nettyClient.sendWritableRequest(remoteServerAddress, writableReauest);
+ nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
}
}
@@ -258,10 +258,10 @@ public class NettyWorkerClient<I extends
getInetSocketAddress(partitionOwner.getWorkerInfo(), partitionId);
Map<I, VertexMutations<I, V, E, M>> partitionMutations =
sendMutationsCache.removePartitionMutations(partitionId);
- WritableRequest<I, V, E, M> writableReauest =
+ WritableRequest<I, V, E, M> writableRequest =
new SendPartitionMutationsRequest<I, V, E, M>(
partitionId, partitionMutations);
- nettyClient.sendWritableRequest(remoteServerAddress, writableReauest);
+ nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java Tue Aug
7 18:54:42 2012
@@ -233,6 +233,12 @@ public class GiraphJob {
/** Default number of messages that can be bulk sent during a flush */
public static final int DEFAULT_MAX_MESSAGES_PER_FLUSH_PUT = 2000;
+ /** Number of channels used per server */
+ public static final String CHANNELS_PER_SERVER =
+ "giraph.channelsPerServer";
+ /** Default number of channels used per server of 1 */
+ public static final int DEFAULT_CHANNELS_PER_SERVER = 1;
+
/** Number of flush threads per peer */
public static final String MSG_NUM_FLUSH_THREADS =
"giraph.msgNumFlushThreads";
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java Tue
Aug 7 18:54:42 2012
@@ -65,7 +65,7 @@ public class ConnectionTest {
NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client =
new NettyClient<IntWritable, IntWritable, IntWritable,
IntWritable>(context);
- client.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+ client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
client.stop();
server.stop();
@@ -105,7 +105,7 @@ public class ConnectionTest {
IntWritable>(context);
List<InetSocketAddress> serverAddresses =
new ArrayList<InetSocketAddress>();
- client.connectAllAdddresses(serverAddresses);
+ client.connectAllAddresses(serverAddresses);
client.stop();
server1.stop();
@@ -137,15 +137,15 @@ public class ConnectionTest {
NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client1 =
new NettyClient<IntWritable, IntWritable, IntWritable,
IntWritable>(context);
- client1.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+ client1.connectAllAddresses(Collections.singleton(server.getMyAddress()));
NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client2 =
new NettyClient<IntWritable, IntWritable, IntWritable,
IntWritable>(context);
- client2.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+ client2.connectAllAddresses(Collections.singleton(server.getMyAddress()));
NettyClient<IntWritable, IntWritable, IntWritable, IntWritable> client3 =
new NettyClient<IntWritable, IntWritable, IntWritable,
IntWritable>(context);
- client3.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+ client3.connectAllAddresses(Collections.singleton(server.getMyAddress()));
client1.stop();
client2.stop();
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1370430&r1=1370429&r2=1370430&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
(original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Tue Aug
7 18:54:42 2012
@@ -105,7 +105,7 @@ public class RequestTest {
client =
new NettyClient<IntWritable, IntWritable, IntWritable, IntWritable>
(context);
- client.connectAllAdddresses(Collections.singleton(server.getMyAddress()));
+ client.connectAllAddresses(Collections.singleton(server.getMyAddress()));
}
@Test