Let me list the differences introduced because of the migration from Netty 3.x 
to 4.x.
 There is a migration guide that mentions most (but not all) of the changes: 
 Please note that the below code changes are based on Wei-Chiu's branch: 
h2. CHANGES IN ShuffleHandler
h3. *I will list the changes mostly from ShuffleHandler as it covers almost all 
type of changes in other classes as well.*

*In TestShuffleHandler, the test code was changed by any of the justifications 
listed down below.*
h3. Change category #1: General API changes / non-configuration getters:

{quote}Non-configuration getters have no get- prefix anymore. (e.g. 
Channel.getRemoteAddress() → Channel.remoteAddress())
 Boolean properties are still prefixed with is- to avoid confusion (e.g. 
'empty' is both an adjective and a verb, so empty() can have two meanings.)
I'm just listing all the changes without additional context (in which method 
they were changed) separated by three dots, as they are simply method renamings:
-        future.getChannel().close();
+        future.channel().closeFuture().awaitUninterruptibly();
-          ChannelPipeline pipeline = future.getChannel().getPipeline();
+          ChannelPipeline pipeline = future.channel().pipeline();
-    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+    port = ((InetSocketAddress)ch.localAddress()).getPort();
-      if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) {
-        e.getChannel().close();
+      if (e.state() == IdleState.WRITER_IDLE && enabledTimeout) {
+        ctx.channel().close();
-      accepted.add(evt.getChannel());
+      accepted.add(ctx.channel());
-        new QueryStringDecoder(request.getUri()).getParameters();
+        new QueryStringDecoder(request.getUri()).parameters(); //getUri was 
not changed, see this later
-      Channel ch = evt.getChannel();
-      ChannelPipeline pipeline = ch.getPipeline();
+      Channel ch = ctx.channel();
+      ChannelPipeline pipeline = ch.pipeline();
-              reduceContext.getCtx().getChannel(),
+              reduceContext.getCtx().channel(),
-      if (ch.getPipeline().get(SslHandler.class) == null) {
+      if (ch.pipeline().get(SslHandler.class) == null) {
-      Channel ch = evt.getChannel();
-      ChannelPipeline pipeline = ch.getPipeline();
+      Channel ch = ctx.channel();
+      ChannelPipeline pipeline = ch.pipeline();
+      ctx.channel().write(response).addListener(ChannelFutureListener.CLOSE);
-      Channel ch = e.getChannel();
-      Throwable cause = e.getCause();
+      Channel ch = ctx.channel();
h3. Change category #2: General API changes / Method signature changes.

*2.1: SimpleChannelUpstreamHandler was renamed to ChannelInboundHandlerAdapter.*
{quote}The terms 'upstream' and 'downstream' were pretty confusing to 
beginners. 4.0 uses 'inbound' and 'outbound' wherever possible.
-  class Shuffle extends SimpleChannelUpstreamHandler {
+  @ChannelHandler.Sharable
+  class Shuffle extends ChannelInboundHandlerAdapter {
*2.2: Simplifed channel state model: 
{quote}channelOpen, channelBound, and channelConnected have been merged to 
channelActive. channelDisconnected, channelUnbound, and channelClosed have been 
merged to channelInactive. Likewise, Channel.isBound() and isConnected() have 
been merged to isActive().
*2.2.1 Changes in class: Shuffle*
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) 
+    public void channelActive(ChannelHandlerContext ctx)
         throws Exception {
-      super.channelOpen(ctx, evt);
+      super.channelActive(ctx);
*2.2.2 Changes in 
 Quoting the change again:
{quote}channelOpen, channelBound, and channelConnected have been merged to 
channelActive. channelDisconnected, channelUnbound, and channelClosed have been 
merged to channelInactive. Likewise, Channel.isBound() and isConnected() have 
been merged to isActive().
       LOG.error("Shuffle error: ", cause);
-      if (ch.isConnected()) {
-        LOG.error("Shuffle error " + e);
+      if (ch.isOpen()) {
         sendError(ctx, INTERNAL_SERVER_ERROR);
{color:#ff0000}I think here we have an issue. The doc says: "Likewise, 
Channel.isBound() and isConnected() have been merged to isActive()."{color}
 {color:#ff0000} So isOpen should be replaced with isActive().{color}

*2.3 Change in method name: 
io.netty.channel.ChannelInboundHandlerAdapter#channelRead vs. old name: 
 Changes in class: Shuffle
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
         throws Exception {
-      HttpRequest request = (HttpRequest) evt.getMessage();
+      HttpRequest request = (HttpRequest) msg;
*2.4 The method parameter of channelRead vs. messageReceived was also changed.*
 This is detailed here: 
 Changes in class: Shuffle
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
         throws Exception {
-      HttpRequest request = (HttpRequest) evt.getMessage();
+      HttpRequest request = (HttpRequest) msg;
Consequently, as there is no event object received, the channel can be gathered 
from the context instead of the event:
-      Channel ch = evt.getChannel();
-      ChannelPipeline pipeline = ch.getPipeline();
+      Channel ch = ctx.channel();
+      ChannelPipeline pipeline = ch.pipeline();
*2.5 ChannelHandler method signature changes: exceptionCaught*

The ExceptionEvent is not passed anymore, there's the exact cause of the issue 
as a Throwable is passed along.
 Consequently, we can't get the cause object from the event.
 Changes in org.apache.hadoop.mapred.ShuffleHandler.Shuffle#exceptionCaught:
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception {
-      Throwable cause = e.getCause();
h3. Change category #3: Behavioral change: write() does not flush automatically:


*Changes in org.apache.hadoop.mapred.ShuffleHandler.Shuffle#channelRead:*
         populateHeaders(mapIds, jobId, user, reduceId, request,
           response, keepAliveParam, mapOutputInfoMap);
       } catch(IOException e) {
-        ch.write(response);
+        ch.writeAndFlush(response);

-      ch.write(response);
+      ch.writeAndFlush(response);
       //Initialize one ReduceContext object per messageReceived call
       boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled;
       ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
*Changes in org.apache.hadoop.mapred.ShuffleHandler.Shuffle#sendMapOutput:*
@@ -1259,7 +1272,7 @@ protected ChannelFuture 
sendMapOutput(ChannelHandlerContext ctx, Channel ch,
         new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
       final DataOutputBuffer dob = new DataOutputBuffer();
-      ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+      ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
       ChannelFuture writeFuture;
         final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
             info.startOffset, info.partLength, manageOsCache, readaheadLength,
             readaheadPool, spillfile.getAbsolutePath(), 
             shuffleBufferSize, shuffleTransferToAllowed);
-        writeFuture = ch.write(partition);
+        writeFuture = ch.writeAndFlush(partition);
             info.startOffset, info.partLength, sslFileBufferSize,
             manageOsCache, readaheadLength, readaheadPool,
-        writeFuture = ch.write(chunk);
+        writeFuture = ch.writeAndFlush(chunk);
h3. Change category #4: Releasing resources of / Deallocating 

I cound't really find any reference in the migration guide for this one.
 *I can see this change in ShuffleHandler:*
@@ -1284,7 +1297,7 @@ public void operationComplete(ChannelFuture future) {
             if (future.isSuccess()) {
-            partition.releaseExternalResources();
+            partition.deallocate();
       } else {
*This calls FadvisedFileRegion#deallocate (formerly releaseExternalResources):*
-  public void releaseExternalResources() {
+  protected void deallocate() {
     if (readaheadRequest != null) {
-    super.releaseExternalResources();
+    super.deallocate();
There's also a related change in TestFadvisedFileRegion.
h3. Change category #5: Closing the channel:

*Code changes in 
@@ -305,7 +312,7 @@ public ReduceMapFileCount(ReduceContext rc) {
     public void operationComplete(ChannelFuture future) throws Exception {
       if (!future.isSuccess()) {
-        future.getChannel().close();
+        future.channel().closeFuture().awaitUninterruptibly();
       int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
@@ -313,12 +320,12 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
         // Let the idle timer handler close keep-alive connections
         if (reduceContext.getKeepAlive()) {
           TimeoutHandler timeoutHandler =
         } else {
-          future.getChannel().close();
+          future.channel().closeFuture().awaitUninterruptibly();
So, channel.close() have been replaced with 
 I couldn't find anything related to this in the migration guide.
h3. Change category #6: Idle state handling on channels:

The old code had an an IdleStateHandler + TimeoutHandler in the pipeline:
    public ChannelPipeline getPipeline() throws Exception {
      ChannelPipeline pipeline = Channels.pipeline();
      if (sslFactory != null) {
        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
      pipeline.addLast("decoder", new HttpRequestDecoder());
      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
      pipeline.addLast("encoder", new HttpResponseEncoder());
      pipeline.addLast("chunking", new ChunkedWriteHandler());
      pipeline.addLast("shuffle", SHUFFLE);
      pipeline.addLast("idle", idleStateHandler);
      pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
      return pipeline;
*The instance of idleStateHandler was created with the constructor of 
  public HttpPipelineFactory(Configuration conf, Timer timer) throws Exception {
      SHUFFLE = getShuffle(conf);
      if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
                          MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
        LOG.info("Encrypted shuffle is enabled.");
        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
      this.idleStateHandler = new IdleStateHandler(timer, 0, 
connectionKeepAliveTimeOut, 0);
The parameters are:
Timer timer,
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds
The writerIdleTimeSeconds will get assigned to the value of field named 
"connectionKeepAliveTimeOut", which is set by Hadoop's configuration.
 The thing is, whatever value the field "connectionKeepAliveTimeOut" takes, 
this is how many seconds the IdleStateHandler will wait until an IdleStateEvent 
will be generated.

*The second part of the handler logic is the TimeoutHandler, that is the last 
handler of the pipeline.*
 The implementation is so concise I can paste it here as reference:
static class TimeoutHandler extends IdleStateAwareChannelHandler {

    private boolean enabledTimeout;

    void setEnabledTimeout(boolean enabledTimeout) {
      this.enabledTimeout = enabledTimeout;

    public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
      if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) {
The point is, this class will handle the IdlesStateEvent generated by the 
 There's an additional flag called "enabledTimeout" that can enable or disable 
the timeout logic.

Let's see what are the changes:
 *6.1 IdleStateHandler:*
 Here's the javadoc of the IdleStateHandler class: 
 As IdleStateHandler is in a completely different class hierarchy, the Timer 
can't be passed to the constructor anymore.
 As a consequence, the org.apache.hadoop.mapred.ShuffleHandler#timer field is 
not required anymore, code changes:
@@ -267,7 +275,6 @@
   boolean connectionKeepAliveEnabled = false;
   private int connectionKeepAliveTimeOut;
   private int mapOutputMetaInfoCacheSize;
-  private Timer timer;
     // Timer is shared across entire factory and must be released separately
-    timer = new HashedWheelTimer();
     try {
-      pipelineFact = new HttpPipelineFactory(conf, timer);
+      pipelineFact = new HttpPipelineFactory(conf);
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     if (pipelineFact != null) {
-    if (timer != null) {
-      // Release this shared timer resource
-      timer.stop();
-    }
*6.2 TimeoutHandler:* 
 With Netty 4.x, there is no IdleStateAwareChannelHandler class anymore.
 The TimeoutHandler should extend IdleStateHandler and implement the 
channelIdle method: 
 The method name and its signature is the same, but the class has changed.
 Here's the code change:
-  static class TimeoutHandler extends IdleStateAwareChannelHandler {
+  static class TimeoutHandler extends IdleStateHandler {
     private boolean enabledTimeout;
+    public TimeoutHandler() {
+      super(1, 1, 1);
+    }
     void setEnabledTimeout(boolean enabledTimeout) {
       this.enabledTimeout = enabledTimeout;
     public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
h3. Change category #7: Server bootrapping:

Netty 4.x has a new API for bootstrapping the server: 
{quote}The bootstrap API has been rewritten from scratch although its purpose 
stays same; it performs the typical steps required to make a server or a client 
up and running, often found in boilerplate code.

The new bootstrap also employs a fluent interface.
Let me list the required changes: 
 *7.1 Type-safe ChannelOptions: 
 The new server bootstrap API utilizes a type-safe way to modify socket 
 Code changes:
-    bootstrap.setOption("backlog", conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE,
-    bootstrap.setOption("child.keepAlive", true);
+    bootstrap.option(ChannelOption.SO_BACKLOG,
+        conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE,
+        .option(ChannelOption.SO_KEEPALIVE, true)
*7.2 Simplified shutdown: 
{quote}There's no more releaseExternalResources(). You can close all open 
channels immediately and make all I/O threads stop themselves by calling 
Code changes:
@@ -577,17 +590,11 @@ protected void serviceStart() throws Exception {
   protected void serviceStop() throws Exception {
     accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
-    if (selector != null) {
-      ServerBootstrap bootstrap = new ServerBootstrap(selector);
-      bootstrap.releaseExternalResources();
-    }
     if (stateDb != null) {
*7.3 NioServerSocketChannelFactory --> NioEventLoopGroup: 
 This is a significant change as the old codebase was using 
HadoopExecutors.newCachedThreadPool for the boss and worker thread groups.
 This can be simplified to:
-    selector = new NioServerSocketChannelFactory(
-         HadoopExecutors.newCachedThreadPool(bossFactory),
-         HadoopExecutors.newCachedThreadPool(workerFactory),
-         maxShuffleThreads);
+    bossGroup = new NioEventLoopGroup(0, bossFactory);
+    workerGroup = new NioEventLoopGroup(0, workerFactory);
With the old Netty codebase, the constructor of NioServerSocketChannelFactory 
had this signature:
public NioServerSocketChannelFactory(
            Executor bossExecutor, Executor workerExecutor,
            int workerCount) {
        this(bossExecutor, 1, workerExecutor, workerCount);
The last parameter was the workerCount, which was taking the value of 
 The constructor that is being used with Netty 4.x:
     * Create a new instance using the specified number of threads, the given 
{@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link 
    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
{color:#ff0000}With Wei-Chiu's code changes, here we have 0 here. I think this 
is not correct, so I modified the code to pass the value of "maxShuffleThreads" 
as the first parameter:{color}
+    bossGroup = new NioEventLoopGroup(maxShuffleThreads, bossFactory);
+    workerGroup = new NioEventLoopGroup(maxShuffleThreads, workerFactory);
*7.4 The rest of the server init code in the serviceStart method is just 
adaptation to the new API:*
@@ -540,22 +550,25 @@ protected void serviceStart() throws Exception {
     userRsrc = new ConcurrentHashMap<String,String>();
     secretManager = new JobTokenSecretManager();
-    ServerBootstrap bootstrap = new ServerBootstrap(selector);
+    bootstrap = new ServerBootstrap();
+    bootstrap.group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class);
-    bootstrap.setPipelineFactory(pipelineFact);
+        .childHandler(pipelineFact);
-    Channel ch = bootstrap.bind(new InetSocketAddress(port));
+    ch = bootstrap.bind(new InetSocketAddress(port)).sync().channel();
     conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
     LOG.info(getName() + " listening on port " + port);

@@ -785,29 +792,33 @@ private void removeJobShuffleInfo(JobID jobId) throws 
IOException {
*7.5 There's one important thing left: Initializing the channel group.*
 The old codebase was using this constructor of DefaultChannelGroup: 
 Here it had only one parameter: The name of the group.
 This is also mentioned in the architectural guide: 

However, Netty 4.x has a different ChannelGroup constructor: It has a name and 
a mandatory EventExecutor parameter: 
 There's nothing in the migration guide regarding this.
 Looking at this javadoc (interface of DefaultChannelGroup is ChannelGroup): 
 The only way channel groups are showcased is this, without any further 
explanation of the semantics:
 ChannelGroup allChannels =
         new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
Wei-Chiu also felt this is a bit fishy:
+  // FIXME: need thread safety.
+  private final ChannelGroup accepted =
+      new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
There's a related SO article: 
 {color:#ff0000}This is the part that I'm the most uncertain of.{color}
h3. *Change category #8: Channel initialization logic*

Details can be found here: 
{quote}ChannelPipelineFactory → ChannelInitializer
 As you noticed in the example above, there is no ChannelPipelineFactory 
anymore. It has been replaced with ChannelInitializer, which gives more control 
over Channel and ChannelPipeline configuration.
 Please note that you don't create a new ChannelPipeline by yourself. After 
observing many use cases reported so far, the Netty project team concluded that 
it has no benefit for a user to create his or her own pipeline implementation 
or to extend the default implementation. Therefore, ChannelPipeline is not 
created by a user anymore. ChannelPipeline is automatically created by a 
Code changes accordingly:
-  class HttpPipelineFactory implements ChannelPipelineFactory {
+  class HttpPipelineFactory extends ChannelInitializer<SocketChannel> {
     final Shuffle SHUFFLE;
     private SSLFactory sslFactory;
-    private final ChannelHandler idleStateHandler;
-    public HttpPipelineFactory(Configuration conf, Timer timer) throws 
Exception {
+    public HttpPipelineFactory(Configuration conf) throws Exception {
       SHUFFLE = getShuffle(conf);
       if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
                           MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
@@ -815,7 +826,7 @@ public HttpPipelineFactory(Configuration conf, Timer timer) 
throws Exception {
         sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
-      this.idleStateHandler = new IdleStateHandler(timer, 0, 
connectionKeepAliveTimeOut, 0);
     public Shuffle getSHUFFLE() {
@@ -828,27 +839,29 @@ public void destroy() {
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
+    @Override protected void initChannel(SocketChannel ch) throws Exception {
+      ChannelPipeline pipeline = ch.pipeline();
       if (sslFactory != null) {
         pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
       pipeline.addLast("decoder", new HttpRequestDecoder());
-      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+      pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
       pipeline.addLast("encoder", new HttpResponseEncoder());
       pipeline.addLast("chunking", new ChunkedWriteHandler());
       pipeline.addLast("shuffle", SHUFFLE);
-      pipeline.addLast("idle", idleStateHandler);
+      pipeline.addLast("idle", new IdleStateHandler(
+          0, connectionKeepAliveTimeOut, 0));
       pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
-      return pipeline;
h3. *Change category #9: Buffer API changes*

{quote}The utility class ChannelBuffers, which creates a new buffer, has been 
split into two utility classes, Unpooled and ByteBufUtil. As can be guessed 
from its name Unpooled, 4.0 introduced pooled ByteBufs which can be allocated 
via ByteBufAllocator implementations.
*Essentially, ChannelBuffers.copiedBuffer needed to be replaced with 
@@ -1312,7 +1325,7 @@ protected void sendError(ChannelHandlerContext ctx, 
String message,
     protected void sendError(ChannelHandlerContext ctx, String msg,
         HttpResponseStatus status, Map<String, String> headers) {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+      FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, 
status, Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
       response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
       // Put shuffle version into http header
@@ -1322,18 +1335,17 @@ protected void sendError(ChannelHandlerContext ctx, 
String msg,
       for (Map.Entry<String, String> header : headers.entrySet()) {
         response.headers().set(header.getKey(), header.getValue());
-      response.setContent(
-          ChannelBuffers.copiedBuffer(msg, CharsetUtil.UTF_8));
       // Close the connection as soon as the error message is sent.

Also, DefaultFullHttpResponse became the go-to class (instead of 
DefaultHttpResponse) to construct a final HTTP response with a buffer.
h2. Questions to Wei-Chiu

[~weichiu] Could you please help me with answering these?

*1. Change category #5, closing channels with:*
Could you please tell me how did you find out to close a channel like this? As 
mentioned above, I couldn't find any resource to justify this.

*2. Question for 6.2 TimeoutHandler:* 
 Could you please tell me what's the reason of calling the super constructor 
with 1, 1, 1?
+    public TimeoutHandler() {
+      super(1, 1, 1);
+    }
*3. Question for 7.3: Initialization of EventLoopGroups:* 
 My updated code is:
+    bossGroup = new NioEventLoopGroup(maxShuffleThreads, bossFactory);
+    workerGroup = new NioEventLoopGroup(maxShuffleThreads, workerFactory);
Am I missing something or was this an overlook on your side?

*4. What about point 2.2.2? Do you think what I'm saying there is okay?*

*5. Do you have any idea for point 7.5?*
h2. Test failures

Work is still in progress, I need to find out what causes the unit tests of the 
ShuffleHandler to fail.
 After I fixed the tests I will also manually check if MR behaves well on a 

