Repository: hadoop
Updated Branches:
refs/heads/branch-2.8 6e005ad32 -> a351a30d9
MAPREDUCE-6850. Shuffle Handler keep-alive connections are closed from the
server side. Contributed by Jonathan Eagles
(cherry picked from commit c8bd5fc7a86f9890ceaa37a89491ab650e7e9a64)
Conflicts:
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a351a30d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a351a30d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a351a30d
Branch: refs/heads/branch-2.8
Commit: a351a30d9d5be72305a93c429bd38678236ab4cc
Parents: 6e005ad
Author: Jason Lowe <[email protected]>
Authored: Thu Mar 30 10:57:19 2017 -0500
Committer: Jason Lowe <[email protected]>
Committed: Thu Mar 30 11:15:25 2017 -0500
----------------------------------------------------------------------
.../apache/hadoop/mapred/ShuffleHandler.java | 68 ++++++++++++++++++--
.../hadoop/mapred/TestShuffleHandler.java | 32 ++++++++-
2 files changed, 92 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a351a30d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index d65132f..70b0bd7 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -104,6 +104,7 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -126,7 +127,13 @@ import
org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.jboss.netty.handler.timeout.IdleState;
+import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
+import org.jboss.netty.handler.timeout.IdleStateEvent;
+import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.jboss.netty.util.CharsetUtil;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
import org.mortbay.jetty.HttpHeaders;
import com.google.common.annotations.VisibleForTesting;
@@ -225,6 +232,7 @@ public class ShuffleHandler extends AuxiliaryService {
public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED =
false;
+ private static final String TIMEOUT_HANDLER = "timeout";
/* the maximum number of files a single GET request can
open simultaneously during shuffle
@@ -234,8 +242,9 @@ public class ShuffleHandler extends AuxiliaryService {
public static final int DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES = 3;
boolean connectionKeepAliveEnabled = false;
- int connectionKeepAliveTimeOut;
- int mapOutputMetaInfoCacheSize;
+ private int connectionKeepAliveTimeOut;
+ private int mapOutputMetaInfoCacheSize;
+ private Timer timer;
@Metrics(about="Shuffle output metrics", context="mapred")
static class ShuffleMetrics implements ChannelFutureListener {
@@ -278,7 +287,15 @@ public class ShuffleHandler extends AuxiliaryService {
int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
if (waitCount == 0) {
metrics.operationComplete(future);
- future.getChannel().close();
+ // Let the idle timer handler close keep-alive connections
+ if (reduceContext.getKeepAlive()) {
+ ChannelPipeline pipeline = future.getChannel().getPipeline();
+ TimeoutHandler timeoutHandler =
+ (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
+ timeoutHandler.setEnabledTimeout(true);
+ } else {
+ future.getChannel().close();
+ }
} else {
pipelineFact.getSHUFFLE().sendMap(reduceContext);
}
@@ -299,11 +316,12 @@ public class ShuffleHandler extends AuxiliaryService {
private String user;
private Map<String, Shuffle.MapOutputInfo> infoMap;
private String outputBasePathStr;
+ private final boolean keepAlive;
public ReduceContext(List<String> mapIds, int rId,
ChannelHandlerContext context, String usr,
Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
- String outputBasePath) {
+ String outputBasePath, boolean keepAlive) {
this.mapIds = mapIds;
this.reduceId = rId;
@@ -324,6 +342,7 @@ public class ShuffleHandler extends AuxiliaryService {
this.user = usr;
this.infoMap = mapOutputInfoMap;
this.outputBasePathStr = outputBasePath;
+ this.keepAlive = keepAlive;
}
public int getReduceId() {
@@ -357,6 +376,10 @@ public class ShuffleHandler extends AuxiliaryService {
public AtomicInteger getMapsToWait() {
return mapsToWait;
}
+
+ public boolean getKeepAlive() {
+ return keepAlive;
+ }
}
ShuffleHandler(MetricsSystem ms) {
@@ -493,8 +516,10 @@ public class ShuffleHandler extends AuxiliaryService {
secretManager = new JobTokenSecretManager();
recoverState(conf);
ServerBootstrap bootstrap = new ServerBootstrap(selector);
+ // Timer is shared across entire factory and must be released separately
+ timer = new HashedWheelTimer();
try {
- pipelineFact = new HttpPipelineFactory(conf);
+ pipelineFact = new HttpPipelineFactory(conf, timer);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@@ -534,6 +559,10 @@ public class ShuffleHandler extends AuxiliaryService {
if (pipelineFact != null) {
pipelineFact.destroy();
}
+ if (timer != null) {
+ // Release this shared timer resource
+ timer.stop();
+ }
if (stateDb != null) {
stateDb.close();
}
@@ -740,12 +769,29 @@ public class ShuffleHandler extends AuxiliaryService {
}
}
+ static class TimeoutHandler extends IdleStateAwareChannelHandler {
+
+ private boolean enabledTimeout;
+
+ void setEnabledTimeout(boolean enabledTimeout) {
+ this.enabledTimeout = enabledTimeout;
+ }
+
+ @Override
+ public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
+ if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) {
+ e.getChannel().close();
+ }
+ }
+ }
+
class HttpPipelineFactory implements ChannelPipelineFactory {
final Shuffle SHUFFLE;
private SSLFactory sslFactory;
+ private final ChannelHandler idleStateHandler;
- public HttpPipelineFactory(Configuration conf) throws Exception {
+ public HttpPipelineFactory(Configuration conf, Timer timer) throws
Exception {
SHUFFLE = getShuffle(conf);
if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
@@ -753,6 +799,7 @@ public class ShuffleHandler extends AuxiliaryService {
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
sslFactory.init();
}
+ this.idleStateHandler = new IdleStateHandler(timer, 0,
connectionKeepAliveTimeOut, 0);
}
public Shuffle getSHUFFLE() {
@@ -776,6 +823,8 @@ public class ShuffleHandler extends AuxiliaryService {
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("chunking", new ChunkedWriteHandler());
pipeline.addLast("shuffle", SHUFFLE);
+ pipeline.addLast("idle", idleStateHandler);
+ pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
return pipeline;
// TODO factor security manager into pipeline
// TODO factor out encode/decode to permit binary shuffle
@@ -912,6 +961,10 @@ public class ShuffleHandler extends AuxiliaryService {
Map<String, MapOutputInfo> mapOutputInfoMap =
new HashMap<String, MapOutputInfo>();
Channel ch = evt.getChannel();
+ ChannelPipeline pipeline = ch.getPipeline();
+ TimeoutHandler timeoutHandler =
+ (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
+ timeoutHandler.setEnabledTimeout(false);
String user = userRsrc.get(jobId);
// $x/$user/appcache/$appId/output/$mapId
@@ -931,8 +984,9 @@ public class ShuffleHandler extends AuxiliaryService {
}
ch.write(response);
//Initialize one ReduceContext object per messageReceived call
+ boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled;
ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
- user, mapOutputInfoMap, outputBasePathStr);
+ user, mapOutputInfoMap, outputBasePathStr, keepAlive);
for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
ChannelFuture nextMap = sendMap(reduceContext);
if(nextMap == null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a351a30d/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index ff8920f..d8fcbb9 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -38,6 +38,7 @@ import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.SocketException;
import java.net.URL;
+import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -80,7 +81,7 @@ import org.apache.hadoop.yarn.server.records.Version;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.AbstractChannel;
@@ -310,6 +311,15 @@ public class TestShuffleHandler {
Assert.assertTrue("sendError called when client closed connection",
failures.size() == 0);
}
+ static class LastSocketAddress {
+ SocketAddress lastAddress;
+ void setAddress(SocketAddress lastAddress) {
+ this.lastAddress = lastAddress;
+ }
+ SocketAddress getSocketAddres() {
+ return lastAddress;
+ }
+ }
@Test(timeout = 10000)
public void testKeepAlive() throws Exception {
@@ -319,6 +329,8 @@ public class TestShuffleHandler {
conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
true);
// try setting to -ve keep alive timeout.
conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
+ final LastSocketAddress lastSocketAddress = new LastSocketAddress();
+
ShuffleHandler shuffleHandler = new ShuffleHandler() {
@Override
protected Shuffle getShuffle(final Configuration conf) {
@@ -364,6 +376,7 @@ public class TestShuffleHandler {
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
Channel ch, String user, String mapId, int reduce,
MapOutputInfo info) throws IOException {
+ lastSocketAddress.setAddress(ch.getRemoteAddress());
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
// send a shuffle header and a lot of data down the channel
@@ -423,6 +436,9 @@ public class TestShuffleHandler {
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
ShuffleHeader header = new ShuffleHeader();
header.readFields(input);
+ byte[] buffer = new byte[1024];
+ while (input.read(buffer) != -1) {}
+ SocketAddress firstAddress = lastSocketAddress.getSocketAddres();
input.close();
// For keepAlive via URL
@@ -444,6 +460,14 @@ public class TestShuffleHandler {
header = new ShuffleHeader();
header.readFields(input);
input.close();
+ SocketAddress secondAddress = lastSocketAddress.getSocketAddres();
+ Assert.assertNotNull("Initial shuffle address should not be null",
+ firstAddress);
+ Assert.assertNotNull("Keep-Alive shuffle address should not be null",
+ secondAddress);
+ Assert.assertEquals("Initial shuffle address and keep-alive shuffle "
+ + "address should be the same", firstAddress, secondAddress);
+
}
@Test(timeout = 10000)
@@ -1052,14 +1076,20 @@ public class TestShuffleHandler {
Mockito.mock(ChannelHandlerContext.class);
final MessageEvent mockEvt = Mockito.mock(MessageEvent.class);
final Channel mockCh = Mockito.mock(AbstractChannel.class);
+ final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class);
// Mock HttpRequest and ChannelFuture
final HttpRequest mockHttpRequest = createMockHttpRequest();
final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
listenerList);
+ final ShuffleHandler.TimeoutHandler timerHandler =
+ new ShuffleHandler.TimeoutHandler();
// Mock Netty Channel Context and Channel behavior
Mockito.doReturn(mockCh).when(mockCtx).getChannel();
+ Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline);
+ Mockito.when(mockPipeline.get(
+ Mockito.any(String.class))).thenReturn(timerHandler);
Mockito.when(mockCtx.getChannel()).thenReturn(mockCh);
Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]