[ https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631149#comment-17631149 ]
ASF GitHub Bot commented on HADOOP-15327: ----------------------------------------- K0K0V0K commented on code in PR #3259: URL: https://github.com/apache/hadoop/pull/3259#discussion_r1018076397 ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java: ########## @@ -182,19 +184,29 @@ public class ShuffleHandler extends AuxiliaryService { public static final HttpResponseStatus TOO_MANY_REQ_STATUS = new HttpResponseStatus(429, "TOO MANY REQUESTS"); - // This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT + // This should be kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT public static final long FETCH_RETRY_DELAY = 1000L; public static final String RETRY_AFTER_HEADER = "Retry-After"; + static final String ENCODER_HANDLER_NAME = "encoder"; private int port; - private ChannelFactory selector; - private final ChannelGroup accepted = new DefaultChannelGroup(); + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private ServerBootstrap bootstrap; + private Channel ch; + private final ChannelGroup accepted = + new DefaultChannelGroup(new DefaultEventExecutorGroup(5).next()); Review Comment: Maybe there can be a line comment why we have to create 5 event executor ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java: ########## @@ -72,19 +73,21 @@ private static final String FETCH_RETRY_AFTER_HEADER = "Retry-After"; protected final Reporter reporter; - private enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP, + @VisibleForTesting + public enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP, CONNECTION, WRONG_REDUCE} - - private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors"; + + @VisibleForTesting + public final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors"; Review Comment: check style wont cry for public static final instead of public final static? ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java: ########## @@ -904,65 +990,84 @@ private List<String> splitMaps(List<String> mapq) { } @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) + public void channelActive(ChannelHandlerContext ctx) throws Exception { - super.channelOpen(ctx, evt); - - if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) { + NettyChannelHelper.channelActive(ctx.channel()); + int numConnections = activeConnections.incrementAndGet(); + if ((maxShuffleConnections > 0) && (numConnections > maxShuffleConnections)) { LOG.info(String.format("Current number of shuffle connections (%d) is " + - "greater than or equal to the max allowed shuffle connections (%d)", + "greater than the max allowed shuffle connections (%d)", accepted.size(), maxShuffleConnections)); - Map<String, String> headers = new HashMap<String, String>(1); + Map<String, String> headers = new HashMap<>(1); // notify fetchers to backoff for a while before closing the connection // if the shuffle connection limit is hit. Fetchers are expected to // handle this notification gracefully, that is, not treating this as a // fetch failure. headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY)); sendError(ctx, "", TOO_MANY_REQ_STATUS, headers); - return; + } else { + super.channelActive(ctx); + accepted.add(ctx.channel()); + LOG.debug("Added channel: {}, channel id: {}. Accepted number of connections={}", + ctx.channel(), ctx.channel().id(), activeConnections.get()); } - accepted.add(evt.getChannel()); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + NettyChannelHelper.channelInactive(ctx.channel()); + super.channelInactive(ctx); + int noOfConnections = activeConnections.decrementAndGet(); + LOG.debug("New value of Accepted number of connections={}", noOfConnections); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - HttpRequest request = (HttpRequest) evt.getMessage(); - if (request.getMethod() != GET) { - sendError(ctx, METHOD_NOT_ALLOWED); - return; + Channel channel = ctx.channel(); + LOG.trace("Executing channelRead, channel id: {}", channel.id()); + HttpRequest request = (HttpRequest) msg; + LOG.debug("Received HTTP request: {}, channel id: {}", request, channel.id()); + if (request.method() != GET) { + sendError(ctx, METHOD_NOT_ALLOWED); + return; } // Check whether the shuffle version is compatible - if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals( - request.headers() != null ? - request.headers().get(ShuffleHeader.HTTP_HEADER_NAME) : null) - || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals( - request.headers() != null ? - request.headers() - .get(ShuffleHeader.HTTP_HEADER_VERSION) : null)) { + String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION; + String httpHeaderName = ShuffleHeader.HTTP_HEADER_NAME; Review Comment: This should not be DEFAULT_HTTP_HEADER_NAME ? (also do we need the if from the #1045 line?) ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/log4j.properties: ########## @@ -17,3 +17,5 @@ log4j.threshold=ALL log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n +log4j.logger.io.netty=DEBUG +log4j.logger.org.apache.hadoop.mapred=DEBUG Review Comment: This wont slow down the build processes too much? ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java: ########## @@ -291,36 +302,86 @@ public void operationComplete(ChannelFuture future) throws Exception { } } + static class NettyChannelHelper { + static ChannelFuture writeToChannel(Channel ch, Object obj) { + LOG.debug("Writing {} to channel: {}", obj.getClass().getSimpleName(), ch.id()); + return ch.writeAndFlush(obj); + } + + static ChannelFuture writeToChannelAndClose(Channel ch, Object obj) { + return writeToChannel(ch, obj).addListener(ChannelFutureListener.CLOSE); + } + + static ChannelFuture writeToChannelAndAddLastHttpContent(Channel ch, HttpResponse obj) { + writeToChannel(ch, obj); + return writeLastHttpContentToChannel(ch); + } + + static ChannelFuture writeLastHttpContentToChannel(Channel ch) { + LOG.debug("Writing LastHttpContent, channel id: {}", ch.id()); + return ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + } + + static ChannelFuture closeChannel(Channel ch) { + LOG.debug("Closing channel, channel id: {}", ch.id()); + return ch.close(); + } + + static void closeChannels(ChannelGroup channelGroup) { + channelGroup.close().awaitUninterruptibly(10, TimeUnit.SECONDS); + } + + public static ChannelFuture closeAsIdle(Channel channel, int timeout) { + LOG.debug("Closing channel as writer was idle for {} seconds", timeout); + return closeChannel(channel); + } + + public static void channelActive(Channel ch) { + LOG.debug("Executing channelActive, channel id: {}", ch.id()); + } + + public static void channelInactive(Channel channel) { + LOG.debug("Executing channelInactive, channel id: {}", channel.id()); + } Review Comment: do we need public keyword here? ( if no and we change these lines maybe the channelActive can be renamed to logChannelActive, same for inactive, and the channel parameter can be renamed to ch. ) ########## hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java: ########## @@ -668,34 +1357,61 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, conns[i].connect(); } - //Ensure first connections are okay - conns[0].getInputStream(); - int rc = conns[0].getResponseCode(); - Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); - - conns[1].getInputStream(); - rc = conns[1].getResponseCode(); - Assert.assertEquals(HttpURLConnection.HTTP_OK, rc); - - // This connection should be closed because it to above the limit - try { - rc = conns[2].getResponseCode(); - Assert.assertEquals("Expected a too-many-requests response code", - ShuffleHandler.TOO_MANY_REQ_STATUS.getCode(), rc); - long backoff = Long.valueOf( - conns[2].getHeaderField(ShuffleHandler.RETRY_AFTER_HEADER)); - Assert.assertTrue("The backoff value cannot be negative.", backoff > 0); - conns[2].getInputStream(); - Assert.fail("Expected an IOException"); - } catch (IOException ioe) { - LOG.info("Expected - connection should not be open"); - } catch (NumberFormatException ne) { - Assert.fail("Expected a numerical value for RETRY_AFTER header field"); - } catch (Exception e) { - Assert.fail("Expected a IOException"); + Map<Integer, List<HttpURLConnection>> mapOfConnections = Maps.newHashMap(); + for (HttpURLConnection conn : conns) { + try { + conn.getInputStream(); + } catch (IOException ioe) { + LOG.info("Expected - connection should not be open"); + } catch (NumberFormatException ne) { + fail("Expected a numerical value for RETRY_AFTER header field"); + } catch (Exception e) { + fail("Expected a IOException"); + } + int statusCode = conn.getResponseCode(); + LOG.debug("Connection status code: {}", statusCode); + mapOfConnections.putIfAbsent(statusCode, new ArrayList<>()); + List<HttpURLConnection> connectionList = mapOfConnections.get(statusCode); + connectionList.add(conn); } + + assertEquals(String.format("Expected only %s and %s response", + OK_STATUS, ShuffleHandler.TOO_MANY_REQ_STATUS), + Sets.newHashSet( + HttpURLConnection.HTTP_OK, + ShuffleHandler.TOO_MANY_REQ_STATUS.code()), + mapOfConnections.keySet()); - shuffleHandler.stop(); + List<HttpURLConnection> successfulConnections = + mapOfConnections.get(HttpURLConnection.HTTP_OK); + assertEquals(String.format("Expected exactly %d requests " + + "with %s response", maxAllowedConnections, OK_STATUS), + maxAllowedConnections, successfulConnections.size()); + + //Ensure exactly one connection is HTTP 429 (TOO MANY REQUESTS) + List<HttpURLConnection> closedConnections = + mapOfConnections.get(ShuffleHandler.TOO_MANY_REQ_STATUS.code()); + assertEquals(String.format("Expected exactly %d %s response", + notAcceptedConnections, ShuffleHandler.TOO_MANY_REQ_STATUS), + notAcceptedConnections, closedConnections.size()); + + // This connection should be closed because it is above the maximum limit + HttpURLConnection conn = closedConnections.get(0); + assertEquals(String.format("Expected a %s response", + ShuffleHandler.TOO_MANY_REQ_STATUS), + ShuffleHandler.TOO_MANY_REQ_STATUS.code(), conn.getResponseCode()); + long backoff = Long.parseLong( + conn.getHeaderField(ShuffleHandler.RETRY_AFTER_HEADER)); + assertTrue("The backoff value cannot be negative.", backoff > 0); + + shuffleHandler.stop(); + + //It's okay to get a ClosedChannelException. + //All other kinds of exceptions means something went wrong + assertEquals("Should have no caught exceptions", + Collections.emptyList(), failures.stream() + .filter(f -> !(f instanceof ClosedChannelException)) + .collect(toList())); Review Comment: Maybe here can be a clean up where we call the close method for every channel, to ensure we dont allocate unused resources? Or that would be too much? > Upgrade MR ShuffleHandler to use Netty4 > --------------------------------------- > > Key: HADOOP-15327 > URL: https://issues.apache.org/jira/browse/HADOOP-15327 > Project: Hadoop Common > Issue Type: Sub-task > Reporter: Xiaoyu Yao > Assignee: Szilard Nemeth > Priority: Major > Labels: pull-request-available > Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch, > HADOOP-15327.003.patch, HADOOP-15327.004.patch, HADOOP-15327.005.patch, > HADOOP-15327.005.patch, > getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log, > hades-results-20221108.zip, testfailure-testMapFileAccess-emptyresponse.zip, > testfailure-testReduceFromPartialMem.zip > > Time Spent: 11.5h > Remaining Estimate: 0h > > This way, we can remove the dependencies on the netty3 (jboss.netty) -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org