[ 
https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631244#comment-17631244
 ] 

ASF GitHub Bot commented on HADOOP-15327:
-----------------------------------------

9uapaw commented on code in PR #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r1018346414


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java:
##########
@@ -904,65 +990,84 @@ private List<String> splitMaps(List<String> mapq) {
     }
 
     @Override
-    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) 
+    public void channelActive(ChannelHandlerContext ctx)
         throws Exception {
-      super.channelOpen(ctx, evt);
-
-      if ((maxShuffleConnections > 0) && (accepted.size() >= 
maxShuffleConnections)) {
+      NettyChannelHelper.channelActive(ctx.channel());
+      int numConnections = activeConnections.incrementAndGet();
+      if ((maxShuffleConnections > 0) && (numConnections > 
maxShuffleConnections)) {
         LOG.info(String.format("Current number of shuffle connections (%d) is 
" + 
-            "greater than or equal to the max allowed shuffle connections 
(%d)", 
+            "greater than the max allowed shuffle connections (%d)",
             accepted.size(), maxShuffleConnections));
 
-        Map<String, String> headers = new HashMap<String, String>(1);
+        Map<String, String> headers = new HashMap<>(1);
         // notify fetchers to backoff for a while before closing the connection
         // if the shuffle connection limit is hit. Fetchers are expected to
         // handle this notification gracefully, that is, not treating this as a
         // fetch failure.
         headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
         sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
-        return;
+      } else {
+        super.channelActive(ctx);
+        accepted.add(ctx.channel());
+        LOG.debug("Added channel: {}, channel id: {}. Accepted number of 
connections={}",
+            ctx.channel(), ctx.channel().id(), activeConnections.get());
       }
-      accepted.add(evt.getChannel());
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      NettyChannelHelper.channelInactive(ctx.channel());
+      super.channelInactive(ctx);
+      int noOfConnections = activeConnections.decrementAndGet();
+      LOG.debug("New value of Accepted number of connections={}", 
noOfConnections);
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg)
         throws Exception {
-      HttpRequest request = (HttpRequest) evt.getMessage();
-      if (request.getMethod() != GET) {
-          sendError(ctx, METHOD_NOT_ALLOWED);
-          return;
+      Channel channel = ctx.channel();
+      LOG.trace("Executing channelRead, channel id: {}", channel.id());
+      HttpRequest request = (HttpRequest) msg;
+      LOG.debug("Received HTTP request: {}, channel id: {}", request, 
channel.id());
+      if (request.method() != GET) {
+        sendError(ctx, METHOD_NOT_ALLOWED);
+        return;
       }
       // Check whether the shuffle version is compatible
-      if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
-          request.headers() != null ?
-              request.headers().get(ShuffleHeader.HTTP_HEADER_NAME) : null)
-          || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
-              request.headers() != null ?
-                  request.headers()
-                      .get(ShuffleHeader.HTTP_HEADER_VERSION) : null)) {
+      String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+      String httpHeaderName = ShuffleHeader.HTTP_HEADER_NAME;

Review Comment:
   This is a valid concern, please fix it.





> Upgrade MR ShuffleHandler to use Netty4
> ---------------------------------------
>
>                 Key: HADOOP-15327
>                 URL: https://issues.apache.org/jira/browse/HADOOP-15327
>             Project: Hadoop Common
>          Issue Type: Sub-task
>            Reporter: Xiaoyu Yao
>            Assignee: Szilard Nemeth
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch, 
> HADOOP-15327.003.patch, HADOOP-15327.004.patch, HADOOP-15327.005.patch, 
> HADOOP-15327.005.patch, 
> getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log, 
> hades-results-20221108.zip, testfailure-testMapFileAccess-emptyresponse.zip, 
> testfailure-testReduceFromPartialMem.zip
>
>          Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> This way, we can remove the dependencies on the netty3 (jboss.netty)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to