szilard-nemeth commented on a change in pull request #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r684177660



##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
##########
@@ -828,27 +893,44 @@ public void destroy() {
       }
     }
 
-    @Override
-    public ChannelPipeline getPipeline() throws Exception {
-      ChannelPipeline pipeline = Channels.pipeline();
+    @Override protected void initChannel(SocketChannel ch) throws Exception {
+      ChannelPipeline pipeline = ch.pipeline();
       if (sslFactory != null) {
         pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
       }
       pipeline.addLast("decoder", new HttpRequestDecoder());
-      pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
-      pipeline.addLast("encoder", new HttpResponseEncoder());
+      pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
+      pipeline.addLast(ENCODER_HANDLER_NAME, new HttpResponseEncoder());
       pipeline.addLast("chunking", new ChunkedWriteHandler());
       pipeline.addLast("shuffle", SHUFFLE);
-      pipeline.addLast("idle", idleStateHandler);
-      pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
-      return pipeline;
+      addOutboundHandlersIfRequired(pipeline);
+      pipeline.addLast(TIMEOUT_HANDLER, new 
TimeoutHandler(connectionKeepAliveTimeOut));
       // TODO factor security manager into pipeline
       // TODO factor out encode/decode to permit binary shuffle
       // TODO factor out decode of index to permit alt. models
     }
+
+    private void addOutboundHandlersIfRequired(ChannelPipeline pipeline) {
+      if (useOutboundExceptionHandler) {
+        
//https://stackoverflow.com/questions/50612403/catch-all-exception-handling-for-outbound-channelhandler
+        pipeline.addLast("outboundExceptionHandler", new 
ChannelOutboundHandlerAdapter() {
+          @Override
+          public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise) throws Exception {
+            
promise.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
+            super.write(ctx, msg, promise);
+          }
+        });
+      }
+      if (useOutboundLogger) {
+        //Replace HttpResponseEncoder with LoggingHttpResponseEncoder
+        //Need to use the same name as before, otherwise we would have 2 
encoders 
+        pipeline.replace(ENCODER_HANDLER_NAME, ENCODER_HANDLER_NAME, new 
LoggingHttpResponseEncoder(false));

Review comment:
       Makes sense, inlined the method and got rid of the pipeline.replace




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to