Updated Branches:
  refs/heads/trunk c5f2d61e8 -> 62f9fd3fd

http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index bfcfb8d..c033746 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -25,13 +25,10 @@ import org.apache.giraph.time.SystemTime;
 import org.apache.giraph.time.Time;
 import org.apache.giraph.time.Times;
 import org.apache.log4j.Logger;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 
 import static 
org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED;
 
@@ -41,7 +38,7 @@ import static 
org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUES
  * @param <R> Request type
  */
 public abstract class RequestServerHandler<R> extends
-    SimpleChannelUpstreamHandler {
+  ChannelInboundHandlerAdapter {
   /** Number of bytes in the encoded response */
   public static final int RESPONSE_BYTES = 13;
   /** Time class to use */
@@ -77,13 +74,13 @@ public abstract class RequestServerHandler<R> extends
   }
 
   @Override
-  public void messageReceived(
-      ChannelHandlerContext ctx, MessageEvent e) {
+  public void channelRead(ChannelHandlerContext ctx, Object msg)
+    throws Exception {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("messageReceived: Got " + e.getMessage().getClass());
+      LOG.trace("messageReceived: Got " + msg.getClass());
     }
 
-    WritableRequest writableRequest = (WritableRequest) e.getMessage();
+    WritableRequest writableRequest = (WritableRequest) msg;
 
     // Simulate a closed connection on the first request (if desired)
     if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
@@ -91,7 +88,7 @@ public abstract class RequestServerHandler<R> extends
           "request " + writableRequest.getRequestId() + " from " +
           writableRequest.getClientId());
       setAlreadyClosedFirstRequest();
-      ctx.getChannel().close();
+      ctx.close();
       return;
     }
 
@@ -121,11 +118,11 @@ public abstract class RequestServerHandler<R> extends
     }
 
     // Send the response with the request id
-    ChannelBuffer buffer = ChannelBuffers.buffer(RESPONSE_BYTES);
+    ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES);
     buffer.writeInt(myTaskInfo.getTaskId());
     buffer.writeLong(writableRequest.getRequestId());
     buffer.writeByte(alreadyDone);
-    e.getChannel().write(buffer);
+    ctx.writeAndFlush(buffer);
   }
 
   /**
@@ -143,28 +140,28 @@ public abstract class RequestServerHandler<R> extends
   public abstract void processRequest(R request);
 
   @Override
-  public void channelConnected(ChannelHandlerContext ctx,
-                               ChannelStateEvent e) throws Exception {
+  public void channelActive(ChannelHandlerContext ctx) throws Exception {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("channelConnected: Connected the channel on " +
-          ctx.getChannel().getRemoteAddress());
+      LOG.debug("channelActive: Connected the channel on " +
+          ctx.channel().remoteAddress());
     }
+    ctx.fireChannelActive();
   }
 
   @Override
-  public void channelClosed(ChannelHandlerContext ctx,
-                            ChannelStateEvent e) throws Exception {
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("channelClosed: Closed the channel on " +
-          ctx.getChannel().getRemoteAddress() + " with event " +
-          e);
+      LOG.debug("channelInactive: Closed the channel on " +
+          ctx.channel().remoteAddress());
     }
+    ctx.fireChannelInactive();
   }
 
   @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+    throws Exception {
     LOG.warn("exceptionCaught: Channel failed with " +
-        "remote address " + ctx.getChannel().getRemoteAddress(), e.getCause());
+        "remote address " + ctx.channel().remoteAddress(), cause.getCause());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
index 1ba06e9..463a47e 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseClientHandler.java
@@ -20,23 +20,20 @@ package org.apache.giraph.comm.netty.handler;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-
-import java.io.IOException;
+
 import java.util.concurrent.ConcurrentMap;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+
 import static 
org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_RESPONSE_FAILED;
 
 /**
  * Generic handler of responses.
  */
-public class ResponseClientHandler extends SimpleChannelUpstreamHandler {
+public class ResponseClientHandler extends ChannelInboundHandlerAdapter {
   /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(ResponseClientHandler.class);
@@ -45,8 +42,8 @@ public class ResponseClientHandler extends 
SimpleChannelUpstreamHandler {
   /** Drop first response (used for simulating failure) */
   private final boolean dropFirstResponse;
   /** Outstanding worker request map */
-  private final ConcurrentMap<ClientRequestId, RequestInfo>
-  workerIdOutstandingRequestMap;
+  private final ConcurrentMap<ClientRequestId,
+      RequestInfo> workerIdOutstandingRequestMap;
 
   /**
    * Constructor.
@@ -64,27 +61,27 @@ public class ResponseClientHandler extends 
SimpleChannelUpstreamHandler {
   }
 
   @Override
-  public void messageReceived(
-      ChannelHandlerContext ctx, MessageEvent event) {
-    if (!(event.getMessage() instanceof ChannelBuffer)) {
+  public void channelRead(ChannelHandlerContext ctx, Object msg)
+    throws Exception {
+    if (!(msg instanceof ByteBuf)) {
       throw new IllegalStateException("messageReceived: Got a " +
-          "non-ChannelBuffer message " + event.getMessage());
+          "non-ByteBuf message " + msg);
     }
 
-    ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
-    ChannelBufferInputStream inputStream = new 
ChannelBufferInputStream(buffer);
+    ByteBuf buf = (ByteBuf) msg;
     int senderId = -1;
     long requestId = -1;
     int response = -1;
     try {
-      senderId = inputStream.readInt();
-      requestId = inputStream.readLong();
-      response = inputStream.readByte();
-      inputStream.close();
-    } catch (IOException e) {
+      senderId = buf.readInt();
+      requestId = buf.readLong();
+      response = buf.readByte();
+    } catch (IndexOutOfBoundsException e) {
       throw new IllegalStateException(
-          "messageReceived: Got IOException ", e);
+          "channelRead: Got IndexOutOfBoundsException ", e);
     }
+    ReferenceCountUtil.release(buf);
+
 
     // Simulate a failed response on the first response (if desired)
     if (dropFirstResponse && !ALREADY_DROPPED_FIRST_RESPONSE) {
@@ -98,7 +95,8 @@ public class ResponseClientHandler extends 
SimpleChannelUpstreamHandler {
     }
 
     if (response == 1) {
-      LOG.info("messageReceived: Already completed request " + requestId);
+      LOG.info("messageReceived: Already completed request (taskId = " +
+          senderId + ", requestId = " + requestId + ")");
     } else if (response != 0) {
       throw new IllegalStateException(
           "messageReceived: Got illegal response " + response);
@@ -107,13 +105,13 @@ public class ResponseClientHandler extends 
SimpleChannelUpstreamHandler {
     RequestInfo requestInfo = workerIdOutstandingRequestMap.remove(
         new ClientRequestId(senderId, requestId));
     if (requestInfo == null) {
-      LOG.info("messageReceived: Already received response for request id = " +
-          requestId);
+      LOG.info("messageReceived: Already received response for (taskId = " +
+          senderId + ", requestId = " + requestId + ")");
     } else {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("messageReceived: Completed " + requestInfo +
-            ".  Waiting on " + workerIdOutstandingRequestMap.size() +
-            " requests");
+        LOG.debug("messageReceived: Completed (taskId = " + senderId + ")" +
+            requestInfo + ".  Waiting on " + workerIdOutstandingRequestMap
+            .size() + " requests");
       }
     }
 
@@ -131,17 +129,20 @@ public class ResponseClientHandler extends 
SimpleChannelUpstreamHandler {
   }
 
   @Override
-  public void channelClosed(ChannelHandlerContext ctx,
-                            ChannelStateEvent e) throws Exception {
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
     if (LOG.isDebugEnabled()) {
       LOG.debug("channelClosed: Closed the channel on " +
-          ctx.getChannel().getRemoteAddress());
+          ctx.channel().remoteAddress());
     }
   }
 
   @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
-    LOG.warn("exceptionCaught: Channel failed with " +
-        "remote address " + ctx.getChannel().getRemoteAddress(), e.getCause());
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+    throws Exception {
+    if (LOG.isDebugEnabled()) {
+      LOG.warn("exceptionCaught: Channel failed with " +
+          "remote address " + ctx.channel().remoteAddress(), cause.getCause());
+    }
   }
 }
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
index e148c8c..c0b45fc 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/ResponseEncoder.java
@@ -21,29 +21,28 @@ package org.apache.giraph.comm.netty.handler;
 import org.apache.giraph.comm.requests.RequestType;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.log4j.Logger;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
 
 /**
  * How a server should respond to a client. Currently only used for
  * responding to client's SASL messages, and removed after client
  * authenticates.
  */
-public class ResponseEncoder extends OneToOneEncoder {
+public class ResponseEncoder extends ChannelOutboundHandlerAdapter {
   /** Class logger. */
   private static final Logger LOG = Logger.getLogger(ResponseEncoder.class);
   /** Holds the place of the message length until known. */
   private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
 
   @Override
-  protected Object encode(ChannelHandlerContext ctx,
-      Channel channel, Object msg) throws Exception {
+  public void write(ChannelHandlerContext ctx, Object msg,
+    ChannelPromise promise) throws Exception {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("encode(" + ctx + "," + channel + "," + msg);
+      LOG.debug("write(" + ctx + "," + msg);
     }
 
     if (!(msg instanceof WritableRequest)) {
@@ -54,10 +53,10 @@ public class ResponseEncoder extends OneToOneEncoder {
     }
     @SuppressWarnings("unchecked")
     WritableRequest writableRequest =
-      (WritableRequest) msg;
-    ChannelBufferOutputStream outputStream =
-      new ChannelBufferOutputStream(ChannelBuffers.dynamicBuffer(
-        10, ctx.getChannel().getConfig().getBufferFactory()));
+        (WritableRequest) msg;
+    ByteBuf buf = ctx.alloc().buffer(10);
+    ByteBufOutputStream outputStream =
+        new ByteBufOutputStream(buf);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("encode: Encoding a message of type " + msg.getClass());
@@ -71,14 +70,15 @@ public class ResponseEncoder extends OneToOneEncoder {
     writableRequest.write(outputStream);
 
     outputStream.flush();
+    outputStream.close();
 
     // Set the correct size at the end.
-    ChannelBuffer encodedBuffer = outputStream.buffer();
-    encodedBuffer.setInt(0, encodedBuffer.writerIndex() - 4);
+    buf.setInt(0, buf.writerIndex() - 4);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("encode: Encoding a message of type " + msg.getClass());
     }
+    ctx.write(buf, promise);
 /*if[HADOOP_NON_SECURE]
 else[HADOOP_NON_SECURE]*/
     if (writableRequest.getType() == RequestType.SASL_COMPLETE_REQUEST) {
@@ -88,13 +88,13 @@ else[HADOOP_NON_SECURE]*/
       // the ResponseEncoder to remove itself also.
       if (LOG.isDebugEnabled()) {
         LOG.debug("encode: Removing RequestEncoder handler: no longer needed," 
+
-            " since client: " + ctx.getChannel().getRemoteAddress() + " has " +
+            " since client: " + ctx.channel().remoteAddress() + " has " +
             "completed authenticating.");
       }
-      ctx.getPipeline().remove(this);
+      ctx.pipeline().remove(this);
     }
 /*end[HADOOP_NON_SECURE]*/
-    return encodedBuffer;
+    ctx.write(buf, promise);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java
index b26a314..8921270 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslClientHandler.java
@@ -27,14 +27,12 @@ import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.log4j.Logger;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferInputStream;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelEvent;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.handler.codec.frame.FixedLengthFrameDecoder;
-import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.FixedLengthFrameDecoder;
+import io.netty.util.ReferenceCountUtil;
 
 import java.io.IOException;
 
@@ -42,7 +40,7 @@ import java.io.IOException;
  * Client-side Netty pipeline component that allows authentication with a
  * server.
  */
-public class SaslClientHandler extends OneToOneDecoder {
+public class SaslClientHandler extends ChannelInboundHandlerAdapter {
   /** Class logger */
   private static final Logger LOG = Logger.getLogger(SaslClientHandler.class);
   /** Configuration */
@@ -58,21 +56,14 @@ public class SaslClientHandler extends OneToOneDecoder {
   }
 
   @Override
-  public void handleUpstream(
-    ChannelHandlerContext ctx, ChannelEvent evt)
+  public void channelRead(ChannelHandlerContext ctx, Object msg)
     throws Exception {
-    if (!(evt instanceof MessageEvent)) {
-      ctx.sendUpstream(evt);
-      return;
-    }
-    MessageEvent e = (MessageEvent) evt;
-    Object originalMessage = e.getMessage();
-    Object decodedMessage = decode(ctx, ctx.getChannel(), originalMessage);
+    WritableRequest decodedMessage = decode(ctx, msg);
     // Generate SASL response to server using Channel-local SASL client.
-    SaslNettyClient saslNettyClient = NettyClient.SASL.get(ctx.getChannel());
+    SaslNettyClient saslNettyClient = ctx.attr(NettyClient.SASL).get();
     if (saslNettyClient == null) {
       throw new Exception("handleUpstream: saslNettyClient was unexpectedly " +
-          "null for channel: " + ctx.getChannel());
+          "null for channel: " + ctx.channel());
     }
     if (decodedMessage.getClass() == SaslCompleteRequest.class) {
       if (LOG.isDebugEnabled()) {
@@ -91,8 +82,8 @@ public class SaslClientHandler extends OneToOneDecoder {
       }
       // Remove SaslClientHandler and replace LengthFieldBasedFrameDecoder
       // from client pipeline.
-      ctx.getPipeline().remove(this);
-      ctx.getPipeline().replace("length-field-based-frame-decoder",
+      ctx.pipeline().remove(this);
+      ctx.pipeline().replace("length-field-based-frame-decoder",
           "fixed-length-frame-decoder",
           new FixedLengthFrameDecoder(RequestServerHandler.RESPONSE_BYTES));
       return;
@@ -128,27 +119,34 @@ public class SaslClientHandler extends OneToOneDecoder {
     // server.
     SaslTokenMessageRequest saslResponse =
       new SaslTokenMessageRequest(responseToServer);
-    ctx.getChannel().write(saslResponse);
+    ctx.channel().writeAndFlush(saslResponse);
   }
 
-  @Override
-  protected Object decode(ChannelHandlerContext ctx,
-                          Channel channel, Object msg) throws Exception {
-    if (!(msg instanceof ChannelBuffer)) {
+  /**
+   * Decode the message read by handler
+   *
+   * @param ctx channel handler context
+   * @param msg message to decode into a writable request
+   * @return decoded writablerequest object
+   * @throws Exception
+   */
+  protected WritableRequest decode(ChannelHandlerContext ctx, Object msg)
+    throws Exception {
+    if (!(msg instanceof ByteBuf)) {
       throw new IllegalStateException("decode: Got illegal message " + msg);
     }
     // Decode msg into an object whose class C implements WritableRequest:
     //  C will be either SaslTokenMessage or SaslComplete.
     //
     // 1. Convert message to a stream that can be decoded.
-    ChannelBuffer buffer = (ChannelBuffer) msg;
-    ChannelBufferInputStream inputStream = new 
ChannelBufferInputStream(buffer);
+    ByteBuf buf = (ByteBuf) msg;
+    ByteBufInputStream inputStream = new ByteBufInputStream(buf);
     // 2. Get first byte: message type:
     int enumValue = inputStream.readByte();
     RequestType type = RequestType.values()[enumValue];
     if (LOG.isDebugEnabled()) {
       LOG.debug("decode: Got a response of type " + type + " from server:" +
-        channel.getRemoteAddress());
+        ctx.channel().remoteAddress());
     }
     // 3. Create object of the type determined in step 2.
     Class<? extends WritableRequest> writableRequestClass =
@@ -162,6 +160,7 @@ public class SaslClientHandler extends OneToOneDecoder {
     } catch (IOException e) {
       LOG.error("decode: Exception when trying to read server response: " + e);
     }
+    ReferenceCountUtil.release(buf);
     // serverResponse can now be used in the next stage in pipeline.
     return serverResponse;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
index da06334..fff58cc 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/SaslServerHandler.java
@@ -35,11 +35,11 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.log4j.Logger;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -52,7 +52,7 @@ import static 
org.apache.giraph.conf.GiraphConstants.NETTY_SIMULATE_FIRST_REQUES
  * authenticate themselves with this server.
  */
 public class SaslServerHandler extends
-    SimpleChannelUpstreamHandler {
+    ChannelInboundHandlerAdapter {
     /** Class logger */
   private static final Logger LOG =
       Logger.getLogger(SaslServerHandler.class);
@@ -80,13 +80,14 @@ public class SaslServerHandler extends
   }
 
   @Override
-  public void messageReceived(
-      ChannelHandlerContext ctx, MessageEvent e) {
+  public void channelRead(ChannelHandlerContext ctx, Object msg)
+    throws Exception {
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("messageReceived: Got " + e.getMessage().getClass());
+      LOG.debug("messageReceived: Got " + msg.getClass());
     }
 
-    WritableRequest writableRequest = (WritableRequest) e.getMessage();
+    WritableRequest writableRequest = (WritableRequest) msg;
     // Simulate a closed connection on the first request (if desired)
     // TODO: Move out into a separate, dedicated handler.
     if (closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
@@ -94,7 +95,7 @@ public class SaslServerHandler extends
           "request " + writableRequest.getRequestId() + " from " +
           writableRequest.getClientId());
       setAlreadyClosedFirstRequest();
-      ctx.getChannel().close();
+      ctx.close();
       return;
     }
 
@@ -103,10 +104,10 @@ public class SaslServerHandler extends
       // (in which case we are looking at the first SASL message from the
       // client).
       SaslNettyServer saslNettyServer =
-          NettyServer.CHANNEL_SASL_NETTY_SERVERS.get(ctx.getChannel());
+          ctx.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).get();
       if (saslNettyServer == null) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("No saslNettyServer for " + ctx.getChannel() +
+          LOG.debug("No saslNettyServer for " + ctx.channel() +
               " yet; creating now, with secret manager: " + secretManager);
         }
         try {
@@ -115,19 +116,18 @@ public class SaslServerHandler extends
         } catch (IOException ioe) { //TODO:
           throw new RuntimeException(ioe);
         }
-        NettyServer.CHANNEL_SASL_NETTY_SERVERS.set(ctx.getChannel(),
-            saslNettyServer);
+        ctx.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).set(saslNettyServer);
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Found existing saslNettyServer on server:" +
-              ctx.getChannel().getLocalAddress() + " for client " +
-              ctx.getChannel().getRemoteAddress());
+              ctx.channel().localAddress() + " for client " +
+              ctx.channel().remoteAddress());
         }
       }
 
       ((SaslTokenMessageRequest) 
writableRequest).processToken(saslNettyServer);
       // Send response to client.
-      ctx.getChannel().write(writableRequest);
+      ctx.write(writableRequest);
       if (saslNettyServer.isComplete()) {
         // If authentication of client is complete, we will also send a
         // SASL-Complete message to the client.
@@ -136,13 +136,14 @@ public class SaslServerHandler extends
               "username: " + saslNettyServer.getUserName());
         }
         SaslCompleteRequest saslComplete = new SaslCompleteRequest();
-        ctx.getChannel().write(saslComplete);
+        ctx.write(saslComplete);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Removing SaslServerHandler from pipeline since SASL " +
               "authentication is complete.");
         }
-        ctx.getPipeline().remove(this);
+        ctx.pipeline().remove(this);
       }
+      ctx.flush();
       // do not send upstream to other handlers: no further action needs to be
       // done for SASL_TOKEN_MESSAGE_REQUEST requests.
       return;
@@ -154,7 +155,7 @@ public class SaslServerHandler extends
       // not completed.
       LOG.warn("Sending upstream an unexpected non-SASL message :  " +
           writableRequest);
-      ctx.sendUpstream(e);
+      ctx.fireChannelRead(msg);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index abc81e8..ea1f12d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -47,6 +47,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.net.DNS;
 
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
 import java.net.UnknownHostException;
 
 /**
@@ -59,6 +63,9 @@ import java.net.UnknownHostException;
  */
 public class GiraphConfiguration extends Configuration
     implements GiraphConstants {
+  /** ByteBufAllocator to be used by netty */
+  private ByteBufAllocator nettyBufferAllocator = null;
+
   /**
    * Constructor that creates the configuration
    */
@@ -840,6 +847,25 @@ public class GiraphConfiguration extends Configuration
     }
   }
 
+  /**
+   * Used by netty client and server to create ByteBufAllocator
+   *
+   * @return ByteBufAllocator
+   */
+  public ByteBufAllocator getNettyAllocator() {
+    if (nettyBufferAllocator == null) {
+      if (NETTY_USE_POOLED_ALLOCATOR.get(this)) { // Use pooled allocator
+        nettyBufferAllocator = new PooledByteBufAllocator(
+            NETTY_USE_DIRECT_MEMORY.get(this));
+      } else { // Use un-pooled allocator
+        // Note: Current default settings create un-pooled heap allocator
+        nettyBufferAllocator = new UnpooledByteBufAllocator(
+            NETTY_USE_DIRECT_MEMORY.get(this));
+      }
+    }
+    return nettyBufferAllocator;
+  }
+
   public int getZookeeperConnectionAttempts() {
     return ZOOKEEPER_CONNECTION_ATTEMPTS.get(this);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 9271152..61830e8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -469,6 +469,16 @@ public interface GiraphConstants {
   IntConfOption TCP_BACKLOG = new IntConfOption("giraph.tcpBacklog", 1,
       "TCP backlog (defaults to number of workers)");
 
+  /** Use netty pooled memory buffer allocator */
+  BooleanConfOption NETTY_USE_POOLED_ALLOCATOR = new BooleanConfOption(
+      "giraph.useNettyPooledAllocator", true, "Should netty use pooled " +
+      "memory allocator?");
+
+  /** Use direct memory buffers in netty */
+  BooleanConfOption NETTY_USE_DIRECT_MEMORY = new BooleanConfOption(
+      "giraph.useNettyDirectMemory", true, "Should netty use direct " +
+      "memory buffers");
+
   /** How big to make the encoder buffer? */
   IntConfOption NETTY_REQUEST_ENCODER_BUFFER_SIZE =
       new IntConfOption("giraph.nettyRequestEncoderBufferSize", 32 * ONE_KB,
@@ -502,7 +512,7 @@ public interface GiraphConstants {
   /** Where to place the netty client execution handle? */
   StrConfOption NETTY_CLIENT_EXECUTION_AFTER_HANDLER =
       new StrConfOption("giraph.nettyClientExecutionAfterHandler",
-          "requestEncoder",
+          "request-encoder",
           "Where to place the netty client execution handle?");
 
   /** Use the execution handler in netty on the server? */

http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java
 
b/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java
index 96e3fad..310d38c 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferInputStream.java
@@ -20,21 +20,21 @@ package org.apache.giraph.utils;
 import java.io.DataInput;
 import java.io.IOException;
 import java.io.UTFDataFormatException;
-import org.jboss.netty.buffer.DynamicChannelBuffer;
+import io.netty.buffer.ByteBuf;
 
 /**
  * Special input that reads from a DynamicChannelBuffer.
  */
 public class DynamicChannelBufferInputStream implements DataInput {
   /** Internal dynamic channel buffer */
-  private DynamicChannelBuffer buffer;
+  private ByteBuf buffer;
 
   /**
    * Constructor.
    *
    * @param buffer Buffer to read from
    */
-  public DynamicChannelBufferInputStream(DynamicChannelBuffer buffer) {
+  public DynamicChannelBufferInputStream(ByteBuf buffer) {
     this.buffer = buffer;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java
 
b/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java
index ca4a7d7..c63627c 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/utils/DynamicChannelBufferOutputStream.java
@@ -20,9 +20,9 @@ package org.apache.giraph.utils;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteOrder;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.buffer.DirectChannelBufferFactory;
-import org.jboss.netty.buffer.DynamicChannelBuffer;
+
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.ByteBuf;
 
 /**
  * Special output stream that can grow as needed and dumps to a
@@ -30,7 +30,7 @@ import org.jboss.netty.buffer.DynamicChannelBuffer;
  */
 public class DynamicChannelBufferOutputStream implements DataOutput {
   /** Internal dynamic channel buffer */
-  private DynamicChannelBuffer buffer;
+  private ByteBuf buffer;
 
   /**
    * Constructor
@@ -38,9 +38,10 @@ public class DynamicChannelBufferOutputStream implements 
DataOutput {
    * @param estimatedLength Estimated length of the buffer
    */
   public DynamicChannelBufferOutputStream(int estimatedLength) {
-    buffer = (DynamicChannelBuffer)
-        ChannelBuffers.dynamicBuffer(ByteOrder.LITTLE_ENDIAN,
-            estimatedLength, DirectChannelBufferFactory.getInstance());
+    buffer = Unpooled.unreleasableBuffer(Unpooled.buffer(estimatedLength))
+        .order(ByteOrder.LITTLE_ENDIAN);
+    // -- TODO unresolved what are benefits of using releasable?
+    // currently nit because it is just used in 1 test file
   }
 
   /**
@@ -48,7 +49,7 @@ public class DynamicChannelBufferOutputStream implements 
DataOutput {
    *
    * @param buffer Buffer to be written to (cleared before use)
    */
-  public DynamicChannelBufferOutputStream(DynamicChannelBuffer buffer) {
+  public DynamicChannelBufferOutputStream(ByteBuf buffer) {
     this.buffer = buffer;
     buffer.clear();
   }
@@ -58,7 +59,7 @@ public class DynamicChannelBufferOutputStream implements 
DataOutput {
    *
    * @return dynamic channel buffer (not a copy)
    */
-  public DynamicChannelBuffer getDynamicChannelBuffer() {
+  public ByteBuf getDynamicChannelBuffer() {
     return buffer;
   }
 
@@ -184,4 +185,3 @@ public class DynamicChannelBufferOutputStream implements 
DataOutput {
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/utils/PipelineUtils.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/PipelineUtils.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/PipelineUtils.java
new file mode 100644
index 0000000..290e0fa
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/PipelineUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelPipeline;
+import io.netty.util.concurrent.EventExecutorGroup;
+
+/**
+ * Utility class for ChannelPipeline
+ */
+public class PipelineUtils {
+  /** Do not instantiate. */
+  private PipelineUtils() { }
+
+  /**
+   * Add a handler to pipeline if it is not already added
+   * and configure the handler to use an executor group based on equality
+   * of compareTo and handlerName
+   *
+   * @param handlerName string name of the handler
+   * @param handler channelhandler object
+   * @param compareTo string name to compare to for executorgroup usage check
+   * @param executor executorgroup instance
+   * @param channel netty channel
+   */
+  public static void addLastWithExecutorCheck(String handlerName,
+    ChannelHandler handler, String compareTo, EventExecutorGroup executor,
+    Channel channel) {
+    ChannelPipeline pipeline = channel.pipeline();
+    if (channel.pipeline().get(handlerName) != null) {
+      return;
+    }
+    if (compareTo.equals(handlerName)) {
+      pipeline.addLast(executor, handlerName, handler);
+    } else {
+      pipeline.addLast(handlerName, handler);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
index 78c230a..08a7914 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
@@ -20,8 +20,9 @@ package org.apache.giraph.utils;
 
 import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.group.ChannelGroupFuture;
+import io.netty.util.concurrent.EventExecutorGroup;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -74,6 +75,17 @@ public class ProgressableUtils {
   }
 
   /**
+   * Wait for executorgroup to terminate, while periodically reporting progress
+   *
+   * @param group ExecutorGroup whose termination we are awaiting
+   * @param progressable Progressable for reporting progress (Job context)
+   */
+  public static void awaitTerminationFuture(EventExecutorGroup group,
+                                            Progressable progressable) {
+    waitForever(new FutureWaitable<>(group.terminationFuture()), progressable);
+  }
+
+  /**
    * Wait for the result of the future to be ready, while periodically
    * reporting progress.
    *

http://git-wip-us.apache.org/repos/asf/giraph/blob/62f9fd3f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f06722e..ceaef23 100644
--- a/pom.xml
+++ b/pom.xml
@@ -299,7 +299,7 @@ under the License.
     <dep.libthrift.version>0.9.0</dep.libthrift.version>
     <dep.log4j.version>1.2.17</dep.log4j.version>
     <dep.mockito.version>1.9.5</dep.mockito.version>
-    <dep.netty.version>3.5.3.Final</dep.netty.version>
+    <dep.netty.version>4.0.14.Final</dep.netty.version>
     <dep.paranamer.version>2.3</dep.paranamer.version>
     <dep.slf4j.version>1.7.5</dep.slf4j.version>
     <dep.tinkerpop.rexter.version>2.4.0</dep.tinkerpop.rexter.version>
@@ -1410,7 +1410,7 @@ under the License.
           </exclusion>
           <exclusion>
             <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
+            <artifactId>netty-all</artifactId>
           </exclusion>
         </exclusions>
       </dependency>
@@ -1536,7 +1536,7 @@ under the License.
       </dependency>
       <dependency>
         <groupId>io.netty</groupId>
-        <artifactId>netty</artifactId>
+        <artifactId>netty-all</artifactId>
         <version>${dep.netty.version}</version>
       </dependency>
       <dependency>

Reply via email to