szilard-nemeth commented on a change in pull request #3259: URL: https://github.com/apache/hadoop/pull/3259#discussion_r684217440
########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java ########## @@ -106,10 +129,584 @@ LoggerFactory.getLogger(TestShuffleHandler.class); private static final File ABS_LOG_DIR = GenericTestUtils.getTestDir( TestShuffleHandler.class.getSimpleName() + "LocDir"); + private static final long ATTEMPT_ID = 12345L; + private static final long ATTEMPT_ID_2 = 12346L; + + + //Control test execution properties with these flags + private static final boolean DEBUG_MODE = false; + //WARNING: If this is set to true and proxy server is not running, tests will fail! + private static final boolean USE_PROXY = false; + private static final int HEADER_WRITE_COUNT = 100000; + private static TestExecution TEST_EXECUTION; + + private static class TestExecution { + private static final int DEFAULT_KEEP_ALIVE_TIMEOUT = -100; + private static final int DEBUG_FRIENDLY_KEEP_ALIVE = 1000; + private static final int DEFAULT_PORT = 0; //random port + private static final int FIXED_PORT = 8088; + private static final String PROXY_HOST = "127.0.0.1"; + private static final int PROXY_PORT = 8888; + private final boolean debugMode; + private final boolean useProxy; + + public TestExecution(boolean debugMode, boolean useProxy) { + this.debugMode = debugMode; + this.useProxy = useProxy; + } + + int getKeepAliveTimeout() { + if (debugMode) { + return DEBUG_FRIENDLY_KEEP_ALIVE; + } + return DEFAULT_KEEP_ALIVE_TIMEOUT; + } + + HttpURLConnection openConnection(URL url) throws IOException { + HttpURLConnection conn; + if (useProxy) { + Proxy proxy + = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(PROXY_HOST, PROXY_PORT)); + conn = (HttpURLConnection) url.openConnection(proxy); + } else { + conn = (HttpURLConnection) url.openConnection(); + } + return conn; + } + + int shuffleHandlerPort() { + if (debugMode) { + return FIXED_PORT; + } else { + return DEFAULT_PORT; + } + } + + void parameterizeConnection(URLConnection conn) { + if (DEBUG_MODE) { + conn.setReadTimeout(1000000); + conn.setConnectTimeout(1000000); + } + } + } + + private static class ResponseConfig { + private static final int ONE_HEADER_DISPLACEMENT = 1; + + private final int headerWriteCount; + private final long actualHeaderWriteCount; + private final int mapOutputCount; + private final int contentLengthOfOneMapOutput; + private long headerSize; + public long contentLengthOfResponse; + + public ResponseConfig(int headerWriteCount, int mapOutputCount, int contentLengthOfOneMapOutput) { + if (mapOutputCount <= 0 && contentLengthOfOneMapOutput > 0) { + throw new IllegalStateException("mapOutputCount should be at least 1"); + } + this.headerWriteCount = headerWriteCount; + this.mapOutputCount = mapOutputCount; + this.contentLengthOfOneMapOutput = contentLengthOfOneMapOutput; + //MapOutputSender#send will send header N + 1 times + //So, (N + 1) * headerSize should be the Content-length header + the expected Content-length as well + this.actualHeaderWriteCount = headerWriteCount + ONE_HEADER_DISPLACEMENT; + } + + private void setHeaderSize(long headerSize) { + this.headerSize = headerSize; + long contentLengthOfAllHeaders = actualHeaderWriteCount * headerSize; + this.contentLengthOfResponse = computeContentLengthOfResponse(contentLengthOfAllHeaders); + LOG.debug("Content-length of all headers: {}", contentLengthOfAllHeaders); + LOG.debug("Content-length of one MapOutput: {}", contentLengthOfOneMapOutput); + LOG.debug("Content-length of final HTTP response: {}", contentLengthOfResponse); + } + + private long computeContentLengthOfResponse(long contentLengthOfAllHeaders) { + int mapOutputCountMultiplier = mapOutputCount; + if (mapOutputCount == 0) { + mapOutputCountMultiplier = 1; + } + return (contentLengthOfAllHeaders + contentLengthOfOneMapOutput) * mapOutputCountMultiplier; + } + } + + private enum ShuffleUrlType { + SIMPLE, WITH_KEEPALIVE, WITH_KEEPALIVE_MULTIPLE_MAP_IDS, WITH_KEEPALIVE_NO_MAP_IDS + } + + private static class InputStreamReadResult { + final String asString; + int totalBytesRead; + + public InputStreamReadResult(byte[] bytes, int totalBytesRead) { + this.asString = new String(bytes, StandardCharsets.UTF_8); + this.totalBytesRead = totalBytesRead; + } + } + + private static abstract class AdditionalMapOutputSenderOperations { + public abstract ChannelFuture perform(ChannelHandlerContext ctx, Channel ch) throws IOException; + } + + private class ShuffleHandlerForKeepAliveTests extends ShuffleHandler { + final LastSocketAddress lastSocketAddress = new LastSocketAddress(); + final ArrayList<Throwable> failures = new ArrayList<>(); + final ShuffleHeaderProvider shuffleHeaderProvider; + final HeaderPopulator headerPopulator; + MapOutputSender mapOutputSender; + private Consumer<IdleStateEvent> channelIdleCallback; + private CustomTimeoutHandler customTimeoutHandler; + private boolean failImmediatelyOnErrors = false; + private boolean closeChannelOnError = true; + private ResponseConfig responseConfig; + + public ShuffleHandlerForKeepAliveTests(long attemptId, ResponseConfig responseConfig, + Consumer<IdleStateEvent> channelIdleCallback) throws IOException { + this(attemptId, responseConfig); + this.channelIdleCallback = channelIdleCallback; + } + + public ShuffleHandlerForKeepAliveTests(long attemptId, ResponseConfig responseConfig) throws IOException { + this.responseConfig = responseConfig; + this.shuffleHeaderProvider = new ShuffleHeaderProvider(attemptId); + this.responseConfig.setHeaderSize(shuffleHeaderProvider.getShuffleHeaderSize()); + this.headerPopulator = new HeaderPopulator(this, responseConfig, shuffleHeaderProvider, true); + this.mapOutputSender = new MapOutputSender(responseConfig, lastSocketAddress, shuffleHeaderProvider); + setUseOutboundExceptionHandler(true); + } + + public void setFailImmediatelyOnErrors(boolean failImmediatelyOnErrors) { + this.failImmediatelyOnErrors = failImmediatelyOnErrors; + } + + public void setCloseChannelOnError(boolean closeChannelOnError) { + this.closeChannelOnError = closeChannelOnError; + } + + @Override + protected Shuffle getShuffle(final Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, + String jobId, String user) throws IOException { + return null; + } + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + } + + @Override + protected void populateHeaders(List<String> mapIds, String jobId, + String user, int reduce, HttpRequest request, + HttpResponse response, boolean keepAliveParam, + Map<String, MapOutputInfo> infoMap) throws IOException { + long contentLength = headerPopulator.populateHeaders( + keepAliveParam); + super.setResponseHeaders(response, keepAliveParam, contentLength); + } + + @Override + protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, + Channel ch, String user, String mapId, int reduce, + MapOutputInfo info) throws IOException { + return mapOutputSender.send(ctx, ch); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.pipeline().replace(HttpResponseEncoder.class, ENCODER_HANDLER_NAME, new LoggingHttpResponseEncoder(false)); + replaceTimeoutHandlerWithCustom(ctx); + LOG.debug("Modified pipeline: {}", ctx.pipeline()); + super.channelActive(ctx); + } + + private void replaceTimeoutHandlerWithCustom(ChannelHandlerContext ctx) { + TimeoutHandler oldTimeoutHandler = + (TimeoutHandler)ctx.pipeline().get(TIMEOUT_HANDLER); + int timeoutValue = + oldTimeoutHandler.getConnectionKeepAliveTimeOut(); + customTimeoutHandler = new CustomTimeoutHandler(timeoutValue, channelIdleCallback); + ctx.pipeline().replace(TIMEOUT_HANDLER, TIMEOUT_HANDLER, customTimeoutHandler); + } + + @Override + protected void sendError(ChannelHandlerContext ctx, + HttpResponseStatus status) { + String message = "Error while processing request. Status: " + status; + handleError(ctx, message); + if (failImmediatelyOnErrors) { + stop(); + } + } + + @Override + protected void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { + String errMessage = String.format("Error while processing request. " + + "Status: " + + "%s, message: %s", status, message); + handleError(ctx, errMessage); + if (failImmediatelyOnErrors) { + stop(); + } + } + }; + } + + private void handleError(ChannelHandlerContext ctx, String message) { + LOG.error(message); + failures.add(new Error(message)); + if (closeChannelOnError) { + LOG.warn("sendError: Closing channel"); + ctx.channel().close(); + } + } + + private class CustomTimeoutHandler extends TimeoutHandler { + private boolean channelIdle = false; + private final Consumer<IdleStateEvent> channelIdleCallback; + + public CustomTimeoutHandler(int connectionKeepAliveTimeOut, + Consumer<IdleStateEvent> channelIdleCallback) { + super(connectionKeepAliveTimeOut); + this.channelIdleCallback = channelIdleCallback; + } + + @Override + public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) { + LOG.debug("Channel idle"); + this.channelIdle = true; + if (channelIdleCallback != null) { + LOG.debug("Calling channel idle callback.."); + channelIdleCallback.accept(e); + } + super.channelIdle(ctx, e); + } + } + } + + private static class MapOutputSender { + private final ResponseConfig responseConfig; + private final LastSocketAddress lastSocketAddress; + private final ShuffleHeaderProvider shuffleHeaderProvider; + private AdditionalMapOutputSenderOperations additionalMapOutputSenderOperations; + + public MapOutputSender(ResponseConfig responseConfig, LastSocketAddress lastSocketAddress, + ShuffleHeaderProvider shuffleHeaderProvider) { + this.responseConfig = responseConfig; + this.lastSocketAddress = lastSocketAddress; + this.shuffleHeaderProvider = shuffleHeaderProvider; + } + + public ChannelFuture send(ChannelHandlerContext ctx, Channel ch) throws IOException { + LOG.debug("In MapOutputSender#send"); + lastSocketAddress.setAddress(ch.remoteAddress()); + ShuffleHeader header = shuffleHeaderProvider.createNewShuffleHeader(); + writeOneHeader(ch, header); + ChannelFuture future = writeHeaderNTimes(ch, header, responseConfig.headerWriteCount); + // This is the last operation + // It's safe to increment ShuffleHeader counter for better identification + shuffleHeaderProvider.incrementCounter(); + if (additionalMapOutputSenderOperations != null) { + return additionalMapOutputSenderOperations.perform(ctx, ch); + } + return future; + } + private void writeOneHeader(Channel ch, ShuffleHeader header) throws IOException { + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + LOG.debug("MapOutputSender#writeOneHeader before WriteAndFlush #1"); + ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength())); + LOG.debug("MapOutputSender#writeOneHeader after WriteAndFlush #1. outputBufferSize: " + dob.size()); + } + + private ChannelFuture writeHeaderNTimes(Channel ch, ShuffleHeader header, int iterations) throws IOException { + DataOutputBuffer dob = new DataOutputBuffer(); + for (int i = 0; i < iterations; ++i) { + header.write(dob); + } + LOG.debug("MapOutputSender#writeHeaderNTimes WriteAndFlush big chunk of data, outputBufferSize: " + dob.size()); + return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength())); Review comment: Removed writeOneHeader as it's enough to send the header only N times. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org