Author: maja
Date: Tue Dec 4 18:52:30 2012
New Revision: 1417114
URL: http://svn.apache.org/viewvc?rev=1417114&view=rev
Log:
GIRAPH-441: Keep track of connected channels in NettyServer
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
Modified: giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1417114&r1=1417113&r2=1417114&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Dec 4 18:52:30 2012
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-441: Keep track of connected channels in NettyServer (majakabiljo)
+
GIRAPH-440: ProgressableUtils - TimeoutException from future.get shouldn't
be rethrown (majakabiljo)
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java?rev=1417114&r1=1417113&r2=1417114&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
Tue Dec 4 18:52:30 2012
@@ -47,6 +47,7 @@ import org.apache.giraph.comm.requests.S
/*end[HADOOP_NON_SECURE]*/
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.graph.TaskInfo;
+import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.TimedLogger;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
@@ -399,8 +400,8 @@ else[HADOOP_NON_SECURE]*/
List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();
for (ChannelFutureAddress waitingConnection : waitingConnectionList) {
context.progress();
- ChannelFuture future =
- waitingConnection.future.awaitUninterruptibly();
+ ChannelFuture future = waitingConnection.future;
+ ProgressableUtils.awaitChannelFuture(future, context);
if (!future.isSuccess()) {
LOG.warn("connectAllAddresses: Future failed " +
"to connect with " + waitingConnection.address + " with " +
@@ -596,7 +597,7 @@ else[HADOOP_NON_SECURE]*/
int reconnectFailures = 0;
while (reconnectFailures < maxConnectionFailures) {
ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
- connectionFuture.awaitUninterruptibly();
+ ProgressableUtils.awaitChannelFuture(connectionFuture, context);
if (connectionFuture.isSuccess()) {
if (LOG.isInfoEnabled()) {
LOG.info("getNextChannel: Connected to " + remoteServer + "!");
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1417114&r1=1417113&r2=1417114&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
Tue Dec 4 18:52:30 2012
@@ -47,10 +47,13 @@ import org.jboss.netty.bootstrap.ServerB
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelLocal;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
@@ -271,6 +274,18 @@ else[HADOOP_NON_SECURE]*/
/*end[HADOOP_NON_SECURE]*/
ChannelPipeline pipeline = pipeline();
+ // Store all connected channels in order to ensure that we can close
+ // them on stop(), or else stop() may hang waiting for the
+ // connections to close on their own
+ pipeline.addLast("connectedChannels",
+ new SimpleChannelUpstreamHandler() {
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,
+ ChannelStateEvent e) throws Exception {
+ super.channelConnected(ctx, e);
+ accepted.add(e.getChannel());
+ }
+ });
pipeline.addLast("serverByteCounter", byteCounter);
pipeline.addLast("requestFrameDecoder",
new LengthFieldBasedFrameDecoder(
@@ -363,7 +378,14 @@ else[HADOOP_NON_SECURE]*/
}
ProgressableUtils.awaitChannelGroupFuture(accepted.close(), progressable);
bossExecutorService.shutdownNow();
+ ProgressableUtils.awaitExecutorTermination(bossExecutorService,
+ progressable);
workerExecutorService.shutdownNow();
+ ProgressableUtils.awaitExecutorTermination(workerExecutorService,
+ progressable);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("stop: Start releasing resources");
+ }
bootstrap.releaseExternalResources();
channelFactory.releaseExternalResources();
if (LOG.isInfoEnabled()) {
Modified:
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
URL:
http://svn.apache.org/viewvc/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java?rev=1417114&r1=1417113&r2=1417114&view=diff
==============================================================================
---
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
(original)
+++
giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
Tue Dec 4 18:52:30 2012
@@ -20,6 +20,7 @@ package org.apache.giraph.utils;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
+import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import java.util.concurrent.ExecutionException;
@@ -79,6 +80,18 @@ public class ProgressableUtils {
}
/**
+ * Wait for {@link ChannelFuture} to finish, while periodically
+ * reporting progress.
+ *
+ * @param future ChannelFuture
+ * @param progressable Progressable for reporting progress (Job context)
+ */
+ public static void awaitChannelFuture(ChannelFuture future,
+ Progressable progressable) {
+ waitForever(new ChannelFutureWaitable(future), progressable);
+ }
+
+ /**
* Wait forever for waitable to finish. Periodically reports progress.
*
* @param waitable Waitable which we wait for
@@ -268,7 +281,7 @@ public class ProgressableUtils {
}
/**
- * {@link Waitable} for waiting on a {@link ChannelGroupFutureWaitable} to
+ * {@link Waitable} for waiting on a {@link ChannelGroupFuture} to
* terminate.
*/
private static class ChannelGroupFutureWaitable extends
@@ -295,4 +308,32 @@ public class ProgressableUtils {
return future.isDone();
}
}
+
+ /**
+ * {@link Waitable} for waiting on a {@link ChannelFuture} to
+ * terminate.
+ */
+ private static class ChannelFutureWaitable extends WaitableWithoutResult {
+ /** ChannelGroupFuture which we want to wait for */
+ private final ChannelFuture future;
+
+ /**
+ * Constructor
+ *
+ * @param future ChannelFuture which we want to wait for
+ */
+ public ChannelFutureWaitable(ChannelFuture future) {
+ this.future = future;
+ }
+
+ @Override
+ public void waitFor(int msecs) throws InterruptedException {
+ future.await(msecs, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean isFinished() {
+ return future.isDone();
+ }
+ }
}