[ https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17358861#comment-17358861 ]
Szilard Nemeth commented on HADOOP-15327: ----------------------------------------- Let me list the differences introduced because of the migration from Netty 3.x to 4.x. There is a migration guide that mentions most (but not all) of the changes: [https://netty.io/wiki/new-and-noteworthy-in-4.0.html] Please note that the below code changes are based on Wei-Chiu's branch: [https://github.com/jojochuang/hadoop/commits/shuffle_handler_netty4] h2. CHANGES IN ShuffleHandler h3. *I will list the changes mostly from ShuffleHandler as it covers almost all type of changes in other classes as well.* *In TestShuffleHandler, the test code was changed by any of the justifications listed down below.* h3. Change category #1: General API changes / non-configuration getters: Details: [https://netty.io/wiki/new-and-noteworthy-in-4.0.html#general-api-changes] {quote}Non-configuration getters have no get- prefix anymore. (e.g. Channel.getRemoteAddress() → Channel.remoteAddress()) Boolean properties are still prefixed with is- to avoid confusion (e.g. 'empty' is both an adjective and a verb, so empty() can have two meanings.) {quote} I'm just listing all the changes without additional context (in which method they were changed) separated by three dots, as they are simply method renamings: {code:java} - future.getChannel().close(); + future.channel().closeFuture().awaitUninterruptibly(); ... ... - ChannelPipeline pipeline = future.getChannel().getPipeline(); + ChannelPipeline pipeline = future.channel().pipeline(); ... ... - port = ((InetSocketAddress)ch.getLocalAddress()).getPort(); + port = ((InetSocketAddress)ch.localAddress()).getPort(); ... ... - if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) { - e.getChannel().close(); + if (e.state() == IdleState.WRITER_IDLE && enabledTimeout) { + ctx.channel().close(); ... ... - accepted.add(evt.getChannel()); + accepted.add(ctx.channel()); ... ... - new QueryStringDecoder(request.getUri()).getParameters(); + new QueryStringDecoder(request.getUri()).parameters(); //getUri was not changed, see this later ... ... - Channel ch = evt.getChannel(); - ChannelPipeline pipeline = ch.getPipeline(); + Channel ch = ctx.channel(); + ChannelPipeline pipeline = ch.pipeline(); ... ... - reduceContext.getCtx().getChannel(), + reduceContext.getCtx().channel(), ... ... - if (ch.getPipeline().get(SslHandler.class) == null) { + if (ch.pipeline().get(SslHandler.class) == null) { ... ... - Channel ch = evt.getChannel(); - ChannelPipeline pipeline = ch.getPipeline(); + Channel ch = ctx.channel(); + ChannelPipeline pipeline = ch.pipeline(); ... ... - ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); + ctx.channel().write(response).addListener(ChannelFutureListener.CLOSE); ... ... - Channel ch = e.getChannel(); - Throwable cause = e.getCause(); + Channel ch = ctx.channel(); {code} h3. Change category #2: General API changes / Method signature changes. *2.1: SimpleChannelUpstreamHandler was renamed to ChannelInboundHandlerAdapter.* [https://netty.io/wiki/new-and-noteworthy-in-4.0.html#upstream--inbound-downstream--outbound] {quote}The terms 'upstream' and 'downstream' were pretty confusing to beginners. 4.0 uses 'inbound' and 'outbound' wherever possible. {quote} {code:java} - class Shuffle extends SimpleChannelUpstreamHandler { + @ChannelHandler.Sharable + class Shuffle extends ChannelInboundHandlerAdapter { {code} *2.2: Simplifed channel state model: [https://netty.io/wiki/new-and-noteworthy-in-4.0.html#simplified-channel-state-model]* {quote}channelOpen, channelBound, and channelConnected have been merged to channelActive. channelDisconnected, channelUnbound, and channelClosed have been merged to channelInactive. Likewise, Channel.isBound() and isConnected() have been merged to isActive(). {quote} *2.2.1 Changes in class: Shuffle* {code:java} @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) + public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelOpen(ctx, evt); + super.channelActive(ctx); {code} *2.2.2 Changes in org.apache.hadoop.mapred.ShuffleHandler.Shuffle#exceptionCaught:* Quoting the change again: {quote}channelOpen, channelBound, and channelConnected have been merged to channelActive. channelDisconnected, channelUnbound, and channelClosed have been merged to channelInactive. Likewise, Channel.isBound() and isConnected() have been merged to isActive(). {quote} {code:java} LOG.error("Shuffle error: ", cause); - if (ch.isConnected()) { - LOG.error("Shuffle error " + e); + if (ch.isOpen()) { sendError(ctx, INTERNAL_SERVER_ERROR); } } {code} {color:#ff0000}I think here we have an issue. The doc says: "Likewise, Channel.isBound() and isConnected() have been merged to isActive()."{color} {color:#ff0000} So isOpen should be replaced with isActive().{color} *2.3 Change in method name: io.netty.channel.ChannelInboundHandlerAdapter#channelRead vs. old name: messageReceived()* [https://netty.io/wiki/new-and-noteworthy-in-4.0.html#case-study-porting-the-factorial-example] Changes in class: Shuffle {code:java} @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - HttpRequest request = (HttpRequest) evt.getMessage(); + HttpRequest request = (HttpRequest) msg; {code} *2.4 The method parameter of channelRead vs. messageReceived was also changed.* This is detailed here: [https://netty.io/wiki/new-and-noteworthy-in-4.0.html#channelhandler-with-no-event-object] Changes in class: Shuffle {code:java} @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - HttpRequest request = (HttpRequest) evt.getMessage(); + HttpRequest request = (HttpRequest) msg; {code} Consequently, as there is no event object received, the channel can be gathered from the context instead of the event: {code:java} - Channel ch = evt.getChannel(); - ChannelPipeline pipeline = ch.getPipeline(); + Channel ch = ctx.channel(); + ChannelPipeline pipeline = ch.pipeline(); {code} *2.5 ChannelHandler method signature changes: exceptionCaught* [https://netty.io/wiki/new-and-noteworthy-in-4.0.html#new-channelhandler-type-hierarchy] The ExceptionEvent is not passed anymore, there's the exact cause of the issue as a Throwable is passed along. Consequently, we can't get the cause object from the event. Changes in org.apache.hadoop.mapred.ShuffleHandler.Shuffle#exceptionCaught: {code:java} @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - Throwable cause = e.getCause(); {code} h3. Change category #3: Behavioral change: write() does not flush automatically: Details: [https://netty.io/wiki/new-and-noteworthy-in-4.0.html#write-does-not-flush-automatically] *Changes in org.apache.hadoop.mapred.ShuffleHandler.Shuffle#channelRead:* {code:java} populateHeaders(mapIds, jobId, user, reduceId, request, response, keepAliveParam, mapOutputInfoMap); } catch(IOException e) { - ch.write(response); + ch.writeAndFlush(response); ... ... - ch.write(response); + ch.writeAndFlush(response); //Initialize one ReduceContext object per messageReceived call boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled; ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx, {code} *Changes in org.apache.hadoop.mapred.ShuffleHandler.Shuffle#sendMapOutput:* {code:java} @@ -1259,7 +1272,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce); final DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); - ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength())); ... ... ChannelFuture writeFuture; final FadvisedFileRegion partition = new FadvisedFileRegion(spill, info.startOffset, info.partLength, manageOsCache, readaheadLength, readaheadPool, spillfile.getAbsolutePath(), shuffleBufferSize, shuffleTransferToAllowed); - writeFuture = ch.write(partition); + writeFuture = ch.writeAndFlush(partition); ... ... info.startOffset, info.partLength, sslFileBufferSize, manageOsCache, readaheadLength, readaheadPool, spillfile.getAbsolutePath()); - writeFuture = ch.write(chunk); + writeFuture = ch.writeAndFlush(chunk); } metrics.shuffleConnections.incr(); {code} h3. Change category #4: Releasing resources of / Deallocating io.netty.channel.DefaultFileRegion: I cound't really find any reference in the migration guide for this one. *I can see this change in ShuffleHandler:* {code:java} @@ -1284,7 +1297,7 @@ public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { partition.transferSuccessful(); } - partition.releaseExternalResources(); + partition.deallocate(); } }); } else { {code} *This calls FadvisedFileRegion#deallocate (formerly releaseExternalResources):* {code:java} @Override - public void releaseExternalResources() { + protected void deallocate() { if (readaheadRequest != null) { readaheadRequest.cancel(); } - super.releaseExternalResources(); + super.deallocate(); } {code} There's also a related change in TestFadvisedFileRegion. h3. Change category #5: Closing the channel: *Code changes in org.apache.hadoop.mapred.ShuffleHandler.ReduceMapFileCount#operationComplete:* {code:java} @@ -305,7 +312,7 @@ public ReduceMapFileCount(ReduceContext rc) { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - future.getChannel().close(); + future.channel().closeFuture().awaitUninterruptibly(); return; } int waitCount = this.reduceContext.getMapsToWait().decrementAndGet(); @@ -313,12 +320,12 @@ public void operationComplete(ChannelFuture future) throws Exception { metrics.operationComplete(future); // Let the idle timer handler close keep-alive connections if (reduceContext.getKeepAlive()) { TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER); timeoutHandler.setEnabledTimeout(true); } else { - future.getChannel().close(); + future.channel().closeFuture().awaitUninterruptibly(); } {code} So, channel.close() have been replaced with channel.closeFuture().awaitUninterruptibly(). I couldn't find anything related to this in the migration guide. h3. Change category #6: Idle state handling on channels: The old code had an an IdleStateHandler + TimeoutHandler in the pipeline: {code:java} @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); if (sslFactory != null) { pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); } pipeline.addLast("decoder", new HttpRequestDecoder()); pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16)); pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("chunking", new ChunkedWriteHandler()); pipeline.addLast("shuffle", SHUFFLE); pipeline.addLast("idle", idleStateHandler); pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler()); return pipeline; } {code} *The instance of idleStateHandler was created with the constructor of HttpPipelineFactory:* {code:java} public HttpPipelineFactory(Configuration conf, Timer timer) throws Exception { SHUFFLE = getShuffle(conf); if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) { LOG.info("Encrypted shuffle is enabled."); sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); sslFactory.init(); } this.idleStateHandler = new IdleStateHandler(timer, 0, connectionKeepAliveTimeOut, 0); } {code} The parameters are: {code:java} Timer timer, int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds {code} The writerIdleTimeSeconds will get assigned to the value of field named "connectionKeepAliveTimeOut", which is set by Hadoop's configuration. The thing is, whatever value the field "connectionKeepAliveTimeOut" takes, this is how many seconds the IdleStateHandler will wait until an IdleStateEvent will be generated. *The second part of the handler logic is the TimeoutHandler, that is the last handler of the pipeline.* The implementation is so concise I can paste it here as reference: {code:java} static class TimeoutHandler extends IdleStateAwareChannelHandler { private boolean enabledTimeout; void setEnabledTimeout(boolean enabledTimeout) { this.enabledTimeout = enabledTimeout; } @Override public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) { if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) { e.getChannel().close(); } } } {code} The point is, this class will handle the IdlesStateEvent generated by the IdleStateHandler. There's an additional flag called "enabledTimeout" that can enable or disable the timeout logic. Let's see what are the changes: *6.1 IdleStateHandler:* Here's the javadoc of the IdleStateHandler class: [https://netty.io/4.0/api/io/netty/handler/timeout/IdleStateHandler.html] As IdleStateHandler is in a completely different class hierarchy, the Timer can't be passed to the constructor anymore. As a consequence, the org.apache.hadoop.mapred.ShuffleHandler#timer field is not required anymore, code changes: {code:java} @@ -267,7 +275,6 @@ boolean connectionKeepAliveEnabled = false; private int connectionKeepAliveTimeOut; private int mapOutputMetaInfoCacheSize; - private Timer timer; ... ... // Timer is shared across entire factory and must be released separately - timer = new HashedWheelTimer(); try { - pipelineFact = new HttpPipelineFactory(conf, timer); + pipelineFact = new HttpPipelineFactory(conf); } catch (Exception ex) { throw new RuntimeException(ex); } ... ... if (pipelineFact != null) { pipelineFact.destroy(); } - if (timer != null) { - // Release this shared timer resource - timer.stop(); - } {code} *6.2 TimeoutHandler:* With Netty 4.x, there is no IdleStateAwareChannelHandler class anymore. The TimeoutHandler should extend IdleStateHandler and implement the channelIdle method: [https://netty.io/4.0/api/io/netty/handler/timeout/IdleStateHandler.html#channelIdle-io.netty.channel.ChannelHandlerContext-io.netty.handler.timeout.IdleStateEvent-] The method name and its signature is the same, but the class has changed. Here's the code change: {code:java} - static class TimeoutHandler extends IdleStateAwareChannelHandler { + static class TimeoutHandler extends IdleStateHandler { private boolean enabledTimeout; + public TimeoutHandler() { + super(1, 1, 1); + } + void setEnabledTimeout(boolean enabledTimeout) { this.enabledTimeout = enabledTimeout; } @Override public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) { } } } {code} h3. Change category #7: Server bootrapping: Netty 4.x has a new API for bootstrapping the server: [https://netty.io/wiki/new-and-noteworthy-in-4.0.html#new-bootstrap-api] {quote}The bootstrap API has been rewritten from scratch although its purpose stays same; it performs the typical steps required to make a server or a client up and running, often found in boilerplate code. The new bootstrap also employs a fluent interface. {quote} Let me list the required changes: *7.1 Type-safe ChannelOptions: [https://netty.io/wiki/new-and-noteworthy-in-4.0.html#type-safe-channeloption]* The new server bootstrap API utilizes a type-safe way to modify socket options: Code changes: {code:java} - bootstrap.setOption("backlog", conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE, - DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE)); - bootstrap.setOption("child.keepAlive", true); + bootstrap.option(ChannelOption.SO_BACKLOG, + conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE, + DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE)) + .option(ChannelOption.SO_KEEPALIVE, true) {code} *7.2 Simplified shutdown: [https://netty.io/wiki/new-and-noteworthy-in-4.0.html#simplified-shutdown]* {quote}There's no more releaseExternalResources(). You can close all open channels immediately and make all I/O threads stop themselves by calling EventLoopGroup.shutdownGracefully(). {quote} Code changes: {code:java} @@ -577,17 +590,11 @@ protected void serviceStart() throws Exception { @Override protected void serviceStop() throws Exception { accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS); - if (selector != null) { - ServerBootstrap bootstrap = new ServerBootstrap(selector); - bootstrap.releaseExternalResources(); - } if (stateDb != null) { stateDb.close(); } {code} *7.3 NioServerSocketChannelFactory --> NioEventLoopGroup: [https://netty.io/wiki/new-and-noteworthy-in-4.0.html#flexible-io-thread-allocation]* This is a significant change as the old codebase was using HadoopExecutors.newCachedThreadPool for the boss and worker thread groups. This can be simplified to: {code:java} - selector = new NioServerSocketChannelFactory( - HadoopExecutors.newCachedThreadPool(bossFactory), - HadoopExecutors.newCachedThreadPool(workerFactory), - maxShuffleThreads); + bossGroup = new NioEventLoopGroup(0, bossFactory); + workerGroup = new NioEventLoopGroup(0, workerFactory); {code} With the old Netty codebase, the constructor of NioServerSocketChannelFactory had this signature: {code:java} public NioServerSocketChannelFactory( Executor bossExecutor, Executor workerExecutor, int workerCount) { this(bossExecutor, 1, workerExecutor, workerCount); } {code} The last parameter was the workerCount, which was taking the value of maxShuffleThreads. The constructor that is being used with Netty 4.x: {code:java} /** * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, SelectorProvider.provider()); } {code} {color:#ff0000}With Wei-Chiu's code changes, here we have 0 here. I think this is not correct, so I modified the code to pass the value of "maxShuffleThreads" as the first parameter:{color} {code:java} + bossGroup = new NioEventLoopGroup(maxShuffleThreads, bossFactory); + workerGroup = new NioEventLoopGroup(maxShuffleThreads, workerFactory); {code} *7.4 The rest of the server init code in the serviceStart method is just adaptation to the new API:* {code:java} @@ -540,22 +550,25 @@ protected void serviceStart() throws Exception { userRsrc = new ConcurrentHashMap<String,String>(); secretManager = new JobTokenSecretManager(); recoverState(conf); - ServerBootstrap bootstrap = new ServerBootstrap(selector); + bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class); + - bootstrap.setPipelineFactory(pipelineFact); + .childHandler(pipelineFact); port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT); - Channel ch = bootstrap.bind(new InetSocketAddress(port)); + ch = bootstrap.bind(new InetSocketAddress(port)).sync().channel(); accepted.add(ch); conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port)); pipelineFact.SHUFFLE.setPort(port); LOG.info(getName() + " listening on port " + port); @@ -785,29 +792,33 @@ private void removeJobShuffleInfo(JobID jobId) throws IOException { } } {code} *7.5 There's one important thing left: Initializing the channel group.* The old codebase was using this constructor of DefaultChannelGroup: [https://docs.jboss.org/netty/3.1/api/org/jboss/netty/channel/group/DefaultChannelGroup.html#DefaultChannelGroup(java.lang.String]) Here it had only one parameter: The name of the group. This is also mentioned in the architectural guide: [https://netty.io/3.8/guide/#example.discard3.co1] However, Netty 4.x has a different ChannelGroup constructor: It has a name and a mandatory EventExecutor parameter: [https://netty.io/4.0/api/io/netty/channel/group/DefaultChannelGroup.html#DefaultChannelGroup-java.lang.String-io.netty.util.concurrent.EventExecutor-] There's nothing in the migration guide regarding this. Looking at this javadoc (interface of DefaultChannelGroup is ChannelGroup): [https://netty.io/4.0/api/io/netty/channel/group/ChannelGroup.html] The only way channel groups are showcased is this, without any further explanation of the semantics: {code:java} ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); {code} Wei-Chiu also felt this is a bit fishy: {code:java} + // FIXME: need thread safety. + private final ChannelGroup accepted = + new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); {code} There's a related SO article: [https://stackoverflow.com/questions/17836976/netty-4-0-instanciate-defaultchannelgroup] {color:#ff0000}This is the part that I'm the most uncertain of.{color} h3. *Change category #8: Channel initialization logic* Details can be found here: [https://netty.io/wiki/new-and-noteworthy-in-4.0.html#channelpipelinefactory--channelinitializer] {quote}ChannelPipelineFactory → ChannelInitializer As you noticed in the example above, there is no ChannelPipelineFactory anymore. It has been replaced with ChannelInitializer, which gives more control over Channel and ChannelPipeline configuration. Please note that you don't create a new ChannelPipeline by yourself. After observing many use cases reported so far, the Netty project team concluded that it has no benefit for a user to create his or her own pipeline implementation or to extend the default implementation. Therefore, ChannelPipeline is not created by a user anymore. ChannelPipeline is automatically created by a Channel. {quote} Code changes accordingly: {code:java} - class HttpPipelineFactory implements ChannelPipelineFactory { + class HttpPipelineFactory extends ChannelInitializer<SocketChannel> { final Shuffle SHUFFLE; private SSLFactory sslFactory; - private final ChannelHandler idleStateHandler; - public HttpPipelineFactory(Configuration conf, Timer timer) throws Exception { + public HttpPipelineFactory(Configuration conf) throws Exception { SHUFFLE = getShuffle(conf); if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) { @@ -815,7 +826,7 @@ public HttpPipelineFactory(Configuration conf, Timer timer) throws Exception { sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); sslFactory.init(); } - this.idleStateHandler = new IdleStateHandler(timer, 0, connectionKeepAliveTimeOut, 0); } public Shuffle getSHUFFLE() { @@ -828,27 +839,29 @@ public void destroy() { } } - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); + @Override protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); if (sslFactory != null) { pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); } pipeline.addLast("decoder", new HttpRequestDecoder()); - pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16)); + pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16)); pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("chunking", new ChunkedWriteHandler()); pipeline.addLast("shuffle", SHUFFLE); - pipeline.addLast("idle", idleStateHandler); + pipeline.addLast("idle", new IdleStateHandler( + 0, connectionKeepAliveTimeOut, 0)); pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler()); - return pipeline; } } {code} h3. *Change category #9: Buffer API changes* Details: [https://netty.io/wiki/new-and-noteworthy-in-4.0.html#channelbuffer--bytebuf] {quote}The utility class ChannelBuffers, which creates a new buffer, has been split into two utility classes, Unpooled and ByteBufUtil. As can be guessed from its name Unpooled, 4.0 introduced pooled ByteBufs which can be allocated via ByteBufAllocator implementations. {quote} *Essentially, ChannelBuffers.copiedBuffer needed to be replaced with Unpooled.copiedBuffer:* {code:java} @@ -1312,7 +1325,7 @@ protected void sendError(ChannelHandlerContext ctx, String message, protected void sendError(ChannelHandlerContext ctx, String msg, HttpResponseStatus status, Map<String, String> headers) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); + FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); // Put shuffle version into http header response.headers().set(ShuffleHeader.HTTP_HEADER_NAME, @@ -1322,18 +1335,17 @@ protected void sendError(ChannelHandlerContext ctx, String msg, for (Map.Entry<String, String> header : headers.entrySet()) { response.headers().set(header.getKey(), header.getValue()); } - response.setContent( - ChannelBuffers.copiedBuffer(msg, CharsetUtil.UTF_8)); // Close the connection as soon as the error message is sent. } {code} Also, DefaultFullHttpResponse became the go-to class (instead of DefaultHttpResponse) to construct a final HTTP response with a buffer. ---- h2. Questions to Wei-Chiu [~weichiu] Could you please help me with answering these? *1. Change category #5, closing channels with:* {code:java} future.channel().closeFuture().awaitUninterruptibly() {code} Could you please tell me how did you find out to close a channel like this? As mentioned above, I couldn't find any resource to justify this. *2. Question for 6.2 TimeoutHandler:* Could you please tell me what's the reason of calling the super constructor with 1, 1, 1? {code:java} + public TimeoutHandler() { + super(1, 1, 1); + } + {code} *3. Question for 7.3: Initialization of EventLoopGroups:* My updated code is: {code:java} + bossGroup = new NioEventLoopGroup(maxShuffleThreads, bossFactory); + workerGroup = new NioEventLoopGroup(maxShuffleThreads, workerFactory); {code} Am I missing something or was this an overlook on your side? *4. What about point 2.2.2? Do you think what I'm saying there is okay?* *5. Do you have any idea for point 7.5?* ---- h2. Test failures Work is still in progress, I need to find out what causes the unit tests of the ShuffleHandler to fail. After I fixed the tests I will also manually check if MR behaves well on a cluster. > Upgrade MR ShuffleHandler to use Netty4 > --------------------------------------- > > Key: HADOOP-15327 > URL: https://issues.apache.org/jira/browse/HADOOP-15327 > Project: Hadoop Common > Issue Type: Sub-task > Reporter: Xiaoyu Yao > Assignee: Szilard Nemeth > Priority: Major > > This way, we can remove the dependencies on the netty3 (jboss.netty) -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org