[ https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631244#comment-17631244 ]
ASF GitHub Bot commented on HADOOP-15327: ----------------------------------------- 9uapaw commented on code in PR #3259: URL: https://github.com/apache/hadoop/pull/3259#discussion_r1018346414 ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java: ########## @@ -904,65 +990,84 @@ private List<String> splitMaps(List<String> mapq) { } @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) + public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelOpen(ctx, evt); - - if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) { + NettyChannelHelper.channelActive(ctx.channel()); + int numConnections = activeConnections.incrementAndGet(); + if ((maxShuffleConnections > 0) && (numConnections > maxShuffleConnections)) { LOG.info(String.format("Current number of shuffle connections (%d) is " + - "greater than or equal to the max allowed shuffle connections (%d)", + "greater than the max allowed shuffle connections (%d)", accepted.size(), maxShuffleConnections)); - Map<String, String> headers = new HashMap<String, String>(1); + Map<String, String> headers = new HashMap<>(1); // notify fetchers to backoff for a while before closing the connection // if the shuffle connection limit is hit. Fetchers are expected to // handle this notification gracefully, that is, not treating this as a // fetch failure. headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY)); sendError(ctx, "", TOO_MANY_REQ_STATUS, headers); - return; + } else { + super.channelActive(ctx); + accepted.add(ctx.channel()); + LOG.debug("Added channel: {}, channel id: {}. Accepted number of connections={}", + ctx.channel(), ctx.channel().id(), activeConnections.get()); } - accepted.add(evt.getChannel()); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + NettyChannelHelper.channelInactive(ctx.channel()); + super.channelInactive(ctx); + int noOfConnections = activeConnections.decrementAndGet(); + LOG.debug("New value of Accepted number of connections={}", noOfConnections); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - HttpRequest request = (HttpRequest) evt.getMessage(); - if (request.getMethod() != GET) { - sendError(ctx, METHOD_NOT_ALLOWED); - return; + Channel channel = ctx.channel(); + LOG.trace("Executing channelRead, channel id: {}", channel.id()); + HttpRequest request = (HttpRequest) msg; + LOG.debug("Received HTTP request: {}, channel id: {}", request, channel.id()); + if (request.method() != GET) { + sendError(ctx, METHOD_NOT_ALLOWED); + return; } // Check whether the shuffle version is compatible - if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals( - request.headers() != null ? - request.headers().get(ShuffleHeader.HTTP_HEADER_NAME) : null) - || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals( - request.headers() != null ? - request.headers() - .get(ShuffleHeader.HTTP_HEADER_VERSION) : null)) { + String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION; + String httpHeaderName = ShuffleHeader.HTTP_HEADER_NAME; Review Comment: This is a valid concern, please fix it. > Upgrade MR ShuffleHandler to use Netty4 > --------------------------------------- > > Key: HADOOP-15327 > URL: https://issues.apache.org/jira/browse/HADOOP-15327 > Project: Hadoop Common > Issue Type: Sub-task > Reporter: Xiaoyu Yao > Assignee: Szilard Nemeth > Priority: Major > Labels: pull-request-available > Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch, > HADOOP-15327.003.patch, HADOOP-15327.004.patch, HADOOP-15327.005.patch, > HADOOP-15327.005.patch, > getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log, > hades-results-20221108.zip, testfailure-testMapFileAccess-emptyresponse.zip, > testfailure-testReduceFromPartialMem.zip > > Time Spent: 11.5h > Remaining Estimate: 0h > > This way, we can remove the dependencies on the netty3 (jboss.netty) -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org