szilard-nemeth commented on a change in pull request #3259: URL: https://github.com/apache/hadoop/pull/3259#discussion_r684177660
########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java ########## @@ -828,27 +893,44 @@ public void destroy() { } } - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); + @Override protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); if (sslFactory != null) { pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); } pipeline.addLast("decoder", new HttpRequestDecoder()); - pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16)); - pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16)); + pipeline.addLast(ENCODER_HANDLER_NAME, new HttpResponseEncoder()); pipeline.addLast("chunking", new ChunkedWriteHandler()); pipeline.addLast("shuffle", SHUFFLE); - pipeline.addLast("idle", idleStateHandler); - pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler()); - return pipeline; + addOutboundHandlersIfRequired(pipeline); + pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler(connectionKeepAliveTimeOut)); // TODO factor security manager into pipeline // TODO factor out encode/decode to permit binary shuffle // TODO factor out decode of index to permit alt. models } + + private void addOutboundHandlersIfRequired(ChannelPipeline pipeline) { + if (useOutboundExceptionHandler) { + //https://stackoverflow.com/questions/50612403/catch-all-exception-handling-for-outbound-channelhandler + pipeline.addLast("outboundExceptionHandler", new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + promise.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); + super.write(ctx, msg, promise); + } + }); + } + if (useOutboundLogger) { + //Replace HttpResponseEncoder with LoggingHttpResponseEncoder + //Need to use the same name as before, otherwise we would have 2 encoders + pipeline.replace(ENCODER_HANDLER_NAME, ENCODER_HANDLER_NAME, new LoggingHttpResponseEncoder(false)); Review comment: Makes sense, inlined the method and got rid of the pipeline.replace -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org