Repository: hadoop Updated Branches: refs/heads/branch-2.8 6e005ad32 -> a351a30d9
MAPREDUCE-6850. Shuffle Handler keep-alive connections are closed from the server side. Contributed by Jonathan Eagles (cherry picked from commit c8bd5fc7a86f9890ceaa37a89491ab650e7e9a64) Conflicts: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a351a30d Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a351a30d Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a351a30d Branch: refs/heads/branch-2.8 Commit: a351a30d9d5be72305a93c429bd38678236ab4cc Parents: 6e005ad Author: Jason Lowe <jl...@yahoo-inc.com> Authored: Thu Mar 30 10:57:19 2017 -0500 Committer: Jason Lowe <jl...@yahoo-inc.com> Committed: Thu Mar 30 11:15:25 2017 -0500 ---------------------------------------------------------------------- .../apache/hadoop/mapred/ShuffleHandler.java | 68 ++++++++++++++++++-- .../hadoop/mapred/TestShuffleHandler.java | 32 ++++++++- 2 files changed, 92 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a351a30d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index d65132f..70b0bd7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -104,6 +104,7 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; @@ -126,7 +127,13 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.QueryStringDecoder; import org.jboss.netty.handler.ssl.SslHandler; import org.jboss.netty.handler.stream.ChunkedWriteHandler; +import org.jboss.netty.handler.timeout.IdleState; +import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler; +import org.jboss.netty.handler.timeout.IdleStateEvent; +import org.jboss.netty.handler.timeout.IdleStateHandler; import org.jboss.netty.util.CharsetUtil; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timer; import org.mortbay.jetty.HttpHeaders; import com.google.common.annotations.VisibleForTesting; @@ -225,6 +232,7 @@ public class ShuffleHandler extends AuxiliaryService { public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true; public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = false; + private static final String TIMEOUT_HANDLER = "timeout"; /* the maximum number of files a single GET request can open simultaneously during shuffle @@ -234,8 +242,9 @@ public class ShuffleHandler extends AuxiliaryService { public static final int DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES = 3; boolean connectionKeepAliveEnabled = false; - int connectionKeepAliveTimeOut; - int mapOutputMetaInfoCacheSize; + private int connectionKeepAliveTimeOut; + private int mapOutputMetaInfoCacheSize; + private Timer timer; @Metrics(about="Shuffle output metrics", context="mapred") static class ShuffleMetrics implements ChannelFutureListener { @@ -278,7 +287,15 @@ public class ShuffleHandler extends AuxiliaryService { int waitCount = this.reduceContext.getMapsToWait().decrementAndGet(); if (waitCount == 0) { metrics.operationComplete(future); - future.getChannel().close(); + // Let the idle timer handler close keep-alive connections + if (reduceContext.getKeepAlive()) { + ChannelPipeline pipeline = future.getChannel().getPipeline(); + TimeoutHandler timeoutHandler = + (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER); + timeoutHandler.setEnabledTimeout(true); + } else { + future.getChannel().close(); + } } else { pipelineFact.getSHUFFLE().sendMap(reduceContext); } @@ -299,11 +316,12 @@ public class ShuffleHandler extends AuxiliaryService { private String user; private Map<String, Shuffle.MapOutputInfo> infoMap; private String outputBasePathStr; + private final boolean keepAlive; public ReduceContext(List<String> mapIds, int rId, ChannelHandlerContext context, String usr, Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap, - String outputBasePath) { + String outputBasePath, boolean keepAlive) { this.mapIds = mapIds; this.reduceId = rId; @@ -324,6 +342,7 @@ public class ShuffleHandler extends AuxiliaryService { this.user = usr; this.infoMap = mapOutputInfoMap; this.outputBasePathStr = outputBasePath; + this.keepAlive = keepAlive; } public int getReduceId() { @@ -357,6 +376,10 @@ public class ShuffleHandler extends AuxiliaryService { public AtomicInteger getMapsToWait() { return mapsToWait; } + + public boolean getKeepAlive() { + return keepAlive; + } } ShuffleHandler(MetricsSystem ms) { @@ -493,8 +516,10 @@ public class ShuffleHandler extends AuxiliaryService { secretManager = new JobTokenSecretManager(); recoverState(conf); ServerBootstrap bootstrap = new ServerBootstrap(selector); + // Timer is shared across entire factory and must be released separately + timer = new HashedWheelTimer(); try { - pipelineFact = new HttpPipelineFactory(conf); + pipelineFact = new HttpPipelineFactory(conf, timer); } catch (Exception ex) { throw new RuntimeException(ex); } @@ -534,6 +559,10 @@ public class ShuffleHandler extends AuxiliaryService { if (pipelineFact != null) { pipelineFact.destroy(); } + if (timer != null) { + // Release this shared timer resource + timer.stop(); + } if (stateDb != null) { stateDb.close(); } @@ -740,12 +769,29 @@ public class ShuffleHandler extends AuxiliaryService { } } + static class TimeoutHandler extends IdleStateAwareChannelHandler { + + private boolean enabledTimeout; + + void setEnabledTimeout(boolean enabledTimeout) { + this.enabledTimeout = enabledTimeout; + } + + @Override + public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) { + if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) { + e.getChannel().close(); + } + } + } + class HttpPipelineFactory implements ChannelPipelineFactory { final Shuffle SHUFFLE; private SSLFactory sslFactory; + private final ChannelHandler idleStateHandler; - public HttpPipelineFactory(Configuration conf) throws Exception { + public HttpPipelineFactory(Configuration conf, Timer timer) throws Exception { SHUFFLE = getShuffle(conf); if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) { @@ -753,6 +799,7 @@ public class ShuffleHandler extends AuxiliaryService { sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); sslFactory.init(); } + this.idleStateHandler = new IdleStateHandler(timer, 0, connectionKeepAliveTimeOut, 0); } public Shuffle getSHUFFLE() { @@ -776,6 +823,8 @@ public class ShuffleHandler extends AuxiliaryService { pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("chunking", new ChunkedWriteHandler()); pipeline.addLast("shuffle", SHUFFLE); + pipeline.addLast("idle", idleStateHandler); + pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler()); return pipeline; // TODO factor security manager into pipeline // TODO factor out encode/decode to permit binary shuffle @@ -912,6 +961,10 @@ public class ShuffleHandler extends AuxiliaryService { Map<String, MapOutputInfo> mapOutputInfoMap = new HashMap<String, MapOutputInfo>(); Channel ch = evt.getChannel(); + ChannelPipeline pipeline = ch.getPipeline(); + TimeoutHandler timeoutHandler = + (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER); + timeoutHandler.setEnabledTimeout(false); String user = userRsrc.get(jobId); // $x/$user/appcache/$appId/output/$mapId @@ -931,8 +984,9 @@ public class ShuffleHandler extends AuxiliaryService { } ch.write(response); //Initialize one ReduceContext object per messageReceived call + boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled; ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx, - user, mapOutputInfoMap, outputBasePathStr); + user, mapOutputInfoMap, outputBasePathStr, keepAlive); for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) { ChannelFuture nextMap = sendMap(reduceContext); if(nextMap == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a351a30d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index ff8920f..d8fcbb9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.net.SocketException; import java.net.URL; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -80,7 +81,7 @@ import org.apache.hadoop.yarn.server.records.Version; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.AbstractChannel; @@ -310,6 +311,15 @@ public class TestShuffleHandler { Assert.assertTrue("sendError called when client closed connection", failures.size() == 0); } + static class LastSocketAddress { + SocketAddress lastAddress; + void setAddress(SocketAddress lastAddress) { + this.lastAddress = lastAddress; + } + SocketAddress getSocketAddres() { + return lastAddress; + } + } @Test(timeout = 10000) public void testKeepAlive() throws Exception { @@ -319,6 +329,8 @@ public class TestShuffleHandler { conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true); // try setting to -ve keep alive timeout. conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100); + final LastSocketAddress lastSocketAddress = new LastSocketAddress(); + ShuffleHandler shuffleHandler = new ShuffleHandler() { @Override protected Shuffle getShuffle(final Configuration conf) { @@ -364,6 +376,7 @@ public class TestShuffleHandler { protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user, String mapId, int reduce, MapOutputInfo info) throws IOException { + lastSocketAddress.setAddress(ch.getRemoteAddress()); HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); // send a shuffle header and a lot of data down the channel @@ -423,6 +436,9 @@ public class TestShuffleHandler { Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); ShuffleHeader header = new ShuffleHeader(); header.readFields(input); + byte[] buffer = new byte[1024]; + while (input.read(buffer) != -1) {} + SocketAddress firstAddress = lastSocketAddress.getSocketAddres(); input.close(); // For keepAlive via URL @@ -444,6 +460,14 @@ public class TestShuffleHandler { header = new ShuffleHeader(); header.readFields(input); input.close(); + SocketAddress secondAddress = lastSocketAddress.getSocketAddres(); + Assert.assertNotNull("Initial shuffle address should not be null", + firstAddress); + Assert.assertNotNull("Keep-Alive shuffle address should not be null", + secondAddress); + Assert.assertEquals("Initial shuffle address and keep-alive shuffle " + + "address should be the same", firstAddress, secondAddress); + } @Test(timeout = 10000) @@ -1052,14 +1076,20 @@ public class TestShuffleHandler { Mockito.mock(ChannelHandlerContext.class); final MessageEvent mockEvt = Mockito.mock(MessageEvent.class); final Channel mockCh = Mockito.mock(AbstractChannel.class); + final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class); // Mock HttpRequest and ChannelFuture final HttpRequest mockHttpRequest = createMockHttpRequest(); final ChannelFuture mockFuture = createMockChannelFuture(mockCh, listenerList); + final ShuffleHandler.TimeoutHandler timerHandler = + new ShuffleHandler.TimeoutHandler(); // Mock Netty Channel Context and Channel behavior Mockito.doReturn(mockCh).when(mockCtx).getChannel(); + Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline); + Mockito.when(mockPipeline.get( + Mockito.any(String.class))).thenReturn(timerHandler); Mockito.when(mockCtx.getChannel()).thenReturn(mockCh); Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class)); Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org