[ 
https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631149#comment-17631149
 ] 

ASF GitHub Bot commented on HADOOP-15327:
-----------------------------------------

K0K0V0K commented on code in PR #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r1018076397


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -182,19 +184,29 @@ public class ShuffleHandler extends AuxiliaryService {
 
   public static final HttpResponseStatus TOO_MANY_REQ_STATUS =
       new HttpResponseStatus(429, "TOO MANY REQUESTS");
-  // This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
+  // This should be kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
   public static final long FETCH_RETRY_DELAY = 1000L;
   public static final String RETRY_AFTER_HEADER = "Retry-After";
+  static final String ENCODER_HANDLER_NAME = "encoder";
 
   private int port;
-  private ChannelFactory selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup();
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;
+  private ServerBootstrap bootstrap;
+  private Channel ch;
+  private final ChannelGroup accepted =
+      new DefaultChannelGroup(new DefaultEventExecutorGroup(5).next());

Review Comment:
   Maybe there can be a line comment why we have to create 5 event executor



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java:
##########
@@ -72,19 +73,21 @@
   private static final String FETCH_RETRY_AFTER_HEADER = "Retry-After";
 
   protected final Reporter reporter;
-  private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
+  @VisibleForTesting
+  public enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
-  
-  private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
+
+  @VisibleForTesting
+  public final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";

Review Comment:
   check style wont cry for public static final instead of public final static?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -904,65 +990,84 @@ private List<String> splitMaps(List<String> mapq) {
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) 
+    public void channelActive(ChannelHandlerContext ctx)
         throws Exception {
-      super.channelOpen(ctx, evt);
-
-      if ((maxShuffleConnections > 0) && (accepted.size() >= 
maxShuffleConnections)) {
+      NettyChannelHelper.channelActive(ctx.channel());
+      int numConnections = activeConnections.incrementAndGet();
+      if ((maxShuffleConnections > 0) && (numConnections > 
maxShuffleConnections)) {
         LOG.info(String.format("Current number of shuffle connections (%d) is 
" + 
-            "greater than or equal to the max allowed shuffle connections 
(%d)", 
+            "greater than the max allowed shuffle connections (%d)",
             accepted.size(), maxShuffleConnections));
 
-        Map<String, String> headers = new HashMap<String, String>(1);
+        Map<String, String> headers = new HashMap<>(1);
         // notify fetchers to backoff for a while before closing the connection
         // if the shuffle connection limit is hit. Fetchers are expected to
         // handle this notification gracefully, that is, not treating this as a
         // fetch failure.
         headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
         sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
-        return;
+      } else {
+        super.channelActive(ctx);
+        accepted.add(ctx.channel());
+        LOG.debug("Added channel: {}, channel id: {}. Accepted number of 
connections={}",
+            ctx.channel(), ctx.channel().id(), activeConnections.get());
       }
-      accepted.add(evt.getChannel());
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      NettyChannelHelper.channelInactive(ctx.channel());
+      super.channelInactive(ctx);
+      int noOfConnections = activeConnections.decrementAndGet();
+      LOG.debug("New value of Accepted number of connections={}", 
noOfConnections);
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
         throws Exception {
-      HttpRequest request = (HttpRequest) evt.getMessage();
-      if (request.getMethod() != GET) {
-          sendError(ctx, METHOD_NOT_ALLOWED);
-          return;
+      Channel channel = ctx.channel();
+      LOG.trace("Executing channelRead, channel id: {}", channel.id());
+      HttpRequest request = (HttpRequest) msg;
+      LOG.debug("Received HTTP request: {}, channel id: {}", request, 
channel.id());
+      if (request.method() != GET) {
+        sendError(ctx, METHOD_NOT_ALLOWED);
+        return;
       }
       // Check whether the shuffle version is compatible
-      if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
-          request.headers() != null ?
-              request.headers().get(ShuffleHeader.HTTP_HEADER_NAME) : null)
-          || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
-              request.headers() != null ?
-                  request.headers()
-                      .get(ShuffleHeader.HTTP_HEADER_VERSION) : null)) {
+      String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+      String httpHeaderName = ShuffleHeader.HTTP_HEADER_NAME;

Review Comment:
   This should not be DEFAULT_HTTP_HEADER_NAME ?
   (also do we need the if from the #1045 line?)



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/log4j.properties:
##########
@@ -17,3 +17,5 @@ log4j.threshold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} 
(%F:%M(%L)) - %m%n
+log4j.logger.io.netty=DEBUG
+log4j.logger.org.apache.hadoop.mapred=DEBUG

Review Comment:
   This wont slow down the build processes too much?



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -291,36 +302,86 @@ public void operationComplete(ChannelFuture future) 
throws Exception {
     }
   }
 
+  static class NettyChannelHelper {
+    static ChannelFuture writeToChannel(Channel ch, Object obj) {
+      LOG.debug("Writing {} to channel: {}", obj.getClass().getSimpleName(), 
ch.id());
+      return ch.writeAndFlush(obj);
+    }
+
+    static ChannelFuture writeToChannelAndClose(Channel ch, Object obj) {
+      return writeToChannel(ch, obj).addListener(ChannelFutureListener.CLOSE);
+    }
+
+    static ChannelFuture writeToChannelAndAddLastHttpContent(Channel ch, 
HttpResponse obj) {
+      writeToChannel(ch, obj);
+      return writeLastHttpContentToChannel(ch);
+    }
+
+    static ChannelFuture writeLastHttpContentToChannel(Channel ch) {
+      LOG.debug("Writing LastHttpContent, channel id: {}", ch.id());
+      return ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+    }
+
+    static ChannelFuture closeChannel(Channel ch) {
+      LOG.debug("Closing channel, channel id: {}", ch.id());
+      return ch.close();
+    }
+
+    static void closeChannels(ChannelGroup channelGroup) {
+      channelGroup.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+    }
+
+    public static ChannelFuture closeAsIdle(Channel channel, int timeout) {
+      LOG.debug("Closing channel as writer was idle for {} seconds", timeout);
+      return closeChannel(channel);
+    }
+
+    public static void channelActive(Channel ch) {
+      LOG.debug("Executing channelActive, channel id: {}", ch.id());
+    }
+
+    public static void channelInactive(Channel channel) {
+      LOG.debug("Executing channelInactive, channel id: {}", channel.id());
+    }

Review Comment:
   do we need public keyword here?
   ( if no and we change these lines maybe the channelActive can be renamed to 
logChannelActive, same for inactive, and the channel parameter can be renamed 
to ch. )



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java:
##########
@@ -668,34 +1357,61 @@ protected ChannelFuture 
sendMapOutput(ChannelHandlerContext ctx,
       conns[i].connect();
     }
 
-    //Ensure first connections are okay
-    conns[0].getInputStream();
-    int rc = conns[0].getResponseCode();
-    Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
-    
-    conns[1].getInputStream();
-    rc = conns[1].getResponseCode();
-    Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
-
-    // This connection should be closed because it to above the limit
-    try {
-      rc = conns[2].getResponseCode();
-      Assert.assertEquals("Expected a too-many-requests response code",
-          ShuffleHandler.TOO_MANY_REQ_STATUS.getCode(), rc);
-      long backoff = Long.valueOf(
-          conns[2].getHeaderField(ShuffleHandler.RETRY_AFTER_HEADER));
-      Assert.assertTrue("The backoff value cannot be negative.", backoff > 0);
-      conns[2].getInputStream();
-      Assert.fail("Expected an IOException");
-    } catch (IOException ioe) {
-      LOG.info("Expected - connection should not be open");
-    } catch (NumberFormatException ne) {
-      Assert.fail("Expected a numerical value for RETRY_AFTER header field");
-    } catch (Exception e) {
-      Assert.fail("Expected a IOException");
+    Map<Integer, List<HttpURLConnection>> mapOfConnections = Maps.newHashMap();
+    for (HttpURLConnection conn : conns) {
+      try {
+        conn.getInputStream();
+      } catch (IOException ioe) {
+        LOG.info("Expected - connection should not be open");
+      } catch (NumberFormatException ne) {
+        fail("Expected a numerical value for RETRY_AFTER header field");
+      } catch (Exception e) {
+        fail("Expected a IOException");
+      }
+      int statusCode = conn.getResponseCode();
+      LOG.debug("Connection status code: {}", statusCode);
+      mapOfConnections.putIfAbsent(statusCode, new ArrayList<>());
+      List<HttpURLConnection> connectionList = 
mapOfConnections.get(statusCode);
+      connectionList.add(conn);
     }
+
+    assertEquals(String.format("Expected only %s and %s response",
+            OK_STATUS, ShuffleHandler.TOO_MANY_REQ_STATUS),
+        Sets.newHashSet(
+            HttpURLConnection.HTTP_OK,
+            ShuffleHandler.TOO_MANY_REQ_STATUS.code()),
+        mapOfConnections.keySet());
     
-    shuffleHandler.stop(); 
+    List<HttpURLConnection> successfulConnections =
+        mapOfConnections.get(HttpURLConnection.HTTP_OK);
+    assertEquals(String.format("Expected exactly %d requests " +
+            "with %s response", maxAllowedConnections, OK_STATUS),
+        maxAllowedConnections, successfulConnections.size());
+
+    //Ensure exactly one connection is HTTP 429 (TOO MANY REQUESTS)
+    List<HttpURLConnection> closedConnections =
+        mapOfConnections.get(ShuffleHandler.TOO_MANY_REQ_STATUS.code());
+    assertEquals(String.format("Expected exactly %d %s response",
+            notAcceptedConnections, ShuffleHandler.TOO_MANY_REQ_STATUS),
+        notAcceptedConnections, closedConnections.size());
+
+    // This connection should be closed because it is above the maximum limit
+    HttpURLConnection conn = closedConnections.get(0);
+    assertEquals(String.format("Expected a %s response",
+            ShuffleHandler.TOO_MANY_REQ_STATUS),
+        ShuffleHandler.TOO_MANY_REQ_STATUS.code(), conn.getResponseCode());
+    long backoff = Long.parseLong(
+        conn.getHeaderField(ShuffleHandler.RETRY_AFTER_HEADER));
+    assertTrue("The backoff value cannot be negative.", backoff > 0);
+
+    shuffleHandler.stop();
+
+    //It's okay to get a ClosedChannelException.
+    //All other kinds of exceptions means something went wrong
+    assertEquals("Should have no caught exceptions",
+        Collections.emptyList(), failures.stream()
+            .filter(f -> !(f instanceof ClosedChannelException))
+            .collect(toList()));

Review Comment:
   Maybe here can be a clean up where we call the close method for every 
channel, to ensure we dont allocate unused resources? Or that would be too much?





> Upgrade MR ShuffleHandler to use Netty4
> ---------------------------------------
>
>                 Key: HADOOP-15327
>                 URL: https://issues.apache.org/jira/browse/HADOOP-15327
>             Project: Hadoop Common
>          Issue Type: Sub-task
>            Reporter: Xiaoyu Yao
>            Assignee: Szilard Nemeth
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch, 
> HADOOP-15327.003.patch, HADOOP-15327.004.patch, HADOOP-15327.005.patch, 
> HADOOP-15327.005.patch, 
> getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log, 
> hades-results-20221108.zip, testfailure-testMapFileAccess-emptyresponse.zip, 
> testfailure-testReduceFromPartialMem.zip
>
>          Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> This way, we can remove the dependencies on the netty3 (jboss.netty)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to