Author: vinodkv Date: Fri Mar 21 21:44:24 2014 New Revision: 1580063 URL: http://svn.apache.org/r1580063 Log: MAPREDUCE-5787. Added the ability to keep alive shuffle connections in the MapReduce shuffle-handler. Contributed by Rajesh Balamohan. svn merge --ignore-ancestry -c 1580062 ../../trunk/
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1580063&r1=1580062&r2=1580063&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Mar 21 21:44:24 2014 @@ -34,6 +34,9 @@ Release 2.4.0 - UNRELEASED NEW FEATURES + MAPREDUCE-5787. Added the ability to keep alive shuffle connections in the + MapReduce shuffle-handler. (Rajesh Balamohan via vinodkv) + IMPROVEMENTS MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1580063&r1=1580062&r2=1580063&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Mar 21 21:44:24 2014 @@ -363,6 +363,21 @@ </property> <property> + <name>mapreduce.shuffle.connection-keep-alive.enable</name> + <value>false</value> + <description>set to true to support keep-alive connections.</description> +</property> + +<property> + <name>mapreduce.shuffle.connection-keep-alive.timeout</name> + <value>5</value> + <description>The number of seconds a shuffle client attempts to retain + http connection. Refer "Keep-Alive: timeout=" header in + Http specification + </description> +</property> + +<property> <name>mapreduce.task.timeout</name> <value>600000</value> <description>The number of milliseconds before a task will be Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1580063&r1=1580062&r2=1580063&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Fri Mar 21 21:44:24 2014 @@ -23,7 +23,6 @@ import static org.jboss.netty.handler.co import static org.jboss.netty.handler.codec.http.HttpMethod.GET; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; -import static org.jboss.netty.handler.codec.http.HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; @@ -41,6 +40,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -110,6 +110,7 @@ import org.jboss.netty.handler.codec.htt import org.jboss.netty.handler.ssl.SslHandler; import org.jboss.netty.handler.stream.ChunkedWriteHandler; import org.jboss.netty.util.CharsetUtil; +import org.mortbay.jetty.HttpHeaders; import com.google.common.base.Charsets; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -156,6 +157,21 @@ public class ShuffleHandler extends Auxi public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port"; public static final int DEFAULT_SHUFFLE_PORT = 13562; + public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = + "mapreduce.shuffle.connection-keep-alive.enable"; + public static final boolean DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = false; + + public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = + "mapreduce.shuffle.connection-keep-alive.timeout"; + public static final int DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = 5; //seconds + + public static final String SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE = + "mapreduce.shuffle.mapoutput-info.meta.cache.size"; + public static final int DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE = + 1000; + + public static final String CONNECTION_CLOSE = "close"; + public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = "mapreduce.shuffle.ssl.file.buffer.size"; @@ -167,6 +183,9 @@ public class ShuffleHandler extends Auxi public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads"; // 0 implies Netty default of 2 * number of available processors public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0; + boolean connectionKeepAliveEnabled = false; + int connectionKeepAliveTimeOut; + int mapOutputMetaInfoCacheSize; @Metrics(about="Shuffle output metrics", context="mapred") static class ShuffleMetrics implements ChannelFutureListener { @@ -328,6 +347,15 @@ public class ShuffleHandler extends Auxi sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); + connectionKeepAliveEnabled = + conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, + DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED); + connectionKeepAliveTimeOut = + Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, + DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT)); + mapOutputMetaInfoCacheSize = + Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE, + DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE)); } @Override @@ -459,6 +487,15 @@ public class ShuffleHandler extends Auxi } final Map<String,List<String>> q = new QueryStringDecoder(request.getUri()).getParameters(); + final List<String> keepAliveList = q.get("keepAlive"); + boolean keepAliveParam = false; + if (keepAliveList != null && keepAliveList.size() == 1) { + keepAliveParam = Boolean.valueOf(keepAliveList.get(0)); + if (LOG.isDebugEnabled()) { + LOG.debug("KeepAliveParam : " + keepAliveList + + " : " + keepAliveParam); + } + } final List<String> mapIds = splitMaps(q.get("map")); final List<String> reduceQ = q.get("reduce"); final List<String> jobQ = q.get("job"); @@ -466,7 +503,8 @@ public class ShuffleHandler extends Auxi LOG.debug("RECV: " + request.getUri() + "\n mapId: " + mapIds + "\n reduceId: " + reduceQ + - "\n jobId: " + jobQ); + "\n jobId: " + jobQ + + "\n keepAlive: " + keepAliveParam); } if (mapIds == null || reduceQ == null || jobQ == null) { @@ -505,27 +543,46 @@ public class ShuffleHandler extends Auxi return; } + Map<String, MapOutputInfo> mapOutputInfoMap = + new HashMap<String, MapOutputInfo>(); Channel ch = evt.getChannel(); + String user = userRsrc.get(jobId); + + // $x/$user/appcache/$appId/output/$mapId + // TODO: Once Shuffle is out of NM, this can use MR APIs to convert + // between App and Job + String outputBasePathStr = getBaseLocation(jobId, user); + + try { + populateHeaders(mapIds, outputBasePathStr, user, reduceId, request, + response, keepAliveParam, mapOutputInfoMap); + } catch(IOException e) { + ch.write(response); + LOG.error("Shuffle error in populating headers :", e); + String errorMessage = getErrorMessage(e); + sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR); + return; + } ch.write(response); // TODO refactor the following into the pipeline ChannelFuture lastMap = null; for (String mapId : mapIds) { try { + MapOutputInfo info = mapOutputInfoMap.get(mapId); + if (info == null) { + info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user); + } lastMap = - sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId); + sendMapOutput(ctx, ch, user, mapId, + reduceId, info); if (null == lastMap) { sendError(ctx, NOT_FOUND); return; } } catch (IOException e) { LOG.error("Shuffle error :", e); - StringBuffer sb = new StringBuffer(e.getMessage()); - Throwable t = e; - while (t.getCause() != null) { - sb.append(t.getCause().getMessage()); - t = t.getCause(); - } - sendError(ctx,sb.toString() , INTERNAL_SERVER_ERROR); + String errorMessage = getErrorMessage(e); + sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR); return; } } @@ -533,6 +590,99 @@ public class ShuffleHandler extends Auxi lastMap.addListener(ChannelFutureListener.CLOSE); } + private String getErrorMessage(Throwable t) { + StringBuffer sb = new StringBuffer(t.getMessage()); + while (t.getCause() != null) { + sb.append(t.getCause().getMessage()); + t = t.getCause(); + } + return sb.toString(); + } + + private String getBaseLocation(String jobId, String user) { + final JobID jobID = JobID.forName(jobId); + final ApplicationId appID = + ApplicationId.newInstance(Long.parseLong(jobID.getJtIdentifier()), + jobID.getId()); + final String baseStr = + ContainerLocalizer.USERCACHE + "/" + user + "/" + + ContainerLocalizer.APPCACHE + "/" + + ConverterUtils.toString(appID) + "/output" + "/"; + return baseStr; + } + + protected MapOutputInfo getMapOutputInfo(String base, String mapId, + int reduce, String user) throws IOException { + // Index file + Path indexFileName = + lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf); + IndexRecord info = + indexCache.getIndexInformation(mapId, reduce, indexFileName, user); + + Path mapOutputFileName = + lDirAlloc.getLocalPathToRead(base + "/file.out", conf); + if (LOG.isDebugEnabled()) { + LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName); + } + MapOutputInfo outputInfo = new MapOutputInfo(mapOutputFileName, info); + return outputInfo; + } + + protected void populateHeaders(List<String> mapIds, String outputBaseStr, + String user, int reduce, HttpRequest request, HttpResponse response, + boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap) + throws IOException { + + long contentLength = 0; + for (String mapId : mapIds) { + String base = outputBaseStr + mapId; + MapOutputInfo outputInfo = getMapOutputInfo(base, mapId, reduce, user); + if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) { + mapOutputInfoMap.put(mapId, outputInfo); + } + // Index file + Path indexFileName = + lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf); + IndexRecord info = + indexCache.getIndexInformation(mapId, reduce, indexFileName, user); + ShuffleHeader header = + new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + + contentLength += info.partLength; + contentLength += dob.getLength(); + } + + // Now set the response headers. + setResponseHeaders(response, keepAliveParam, contentLength); + } + + protected void setResponseHeaders(HttpResponse response, + boolean keepAliveParam, long contentLength) { + if (!connectionKeepAliveEnabled && !keepAliveParam) { + LOG.info("Setting connection close header..."); + response.setHeader(HttpHeaders.CONNECTION, CONNECTION_CLOSE); + } else { + response.setHeader(HttpHeaders.CONTENT_LENGTH, + String.valueOf(contentLength)); + response.setHeader(HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE); + response.setHeader(HttpHeaders.KEEP_ALIVE, "timeout=" + + connectionKeepAliveTimeOut); + LOG.info("Content Length in shuffle : " + contentLength); + } + } + + class MapOutputInfo { + final Path mapOutputFileName; + final IndexRecord indexRecord; + + MapOutputInfo(Path mapOutputFileName, IndexRecord indexRecord) { + this.mapOutputFileName = mapOutputFileName; + this.indexRecord = indexRecord; + } + } + protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException { @@ -575,39 +725,16 @@ public class ShuffleHandler extends Auxi } protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, - String user, String jobId, String mapId, int reduce) + String user, String mapId, int reduce, MapOutputInfo mapOutputInfo) throws IOException { - // TODO replace w/ rsrc alloc - // $x/$user/appcache/$appId/output/$mapId - // TODO: Once Shuffle is out of NM, this can use MR APIs to convert between App and Job - JobID jobID = JobID.forName(jobId); - ApplicationId appID = ApplicationId.newInstance( - Long.parseLong(jobID.getJtIdentifier()), jobID.getId()); - final String base = - ContainerLocalizer.USERCACHE + "/" + user + "/" - + ContainerLocalizer.APPCACHE + "/" - + ConverterUtils.toString(appID) + "/output" + "/" + mapId; - if (LOG.isDebugEnabled()) { - LOG.debug("DEBUG0 " + base); - } - // Index file - Path indexFileName = lDirAlloc.getLocalPathToRead( - base + "/file.out.index", conf); - // Map-output file - Path mapOutputFileName = lDirAlloc.getLocalPathToRead( - base + "/file.out", conf); - if (LOG.isDebugEnabled()) { - LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : " - + indexFileName); - } - final IndexRecord info = - indexCache.getIndexInformation(mapId, reduce, indexFileName, user); + final IndexRecord info = mapOutputInfo.indexRecord; final ShuffleHeader header = new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce); final DataOutputBuffer dob = new DataOutputBuffer(); header.write(dob); ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); - final File spillfile = new File(mapOutputFileName.toString()); + final File spillfile = + new File(mapOutputInfo.mapOutputFileName.toString()); RandomAccessFile spill; try { spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1580063&r1=1580062&r2=1580063&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Fri Mar 21 21:44:24 2014 @@ -23,6 +23,8 @@ import static org.apache.hadoop.test.Met import static org.apache.hadoop.test.MockitoMaker.make; import static org.apache.hadoop.test.MockitoMaker.stub; import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; import static org.junit.Assert.assertEquals; import static org.junit.Assume.assumeTrue; @@ -39,6 +41,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.zip.CheckedOutputStream; import java.util.zip.Checksum; @@ -69,17 +72,24 @@ import org.apache.hadoop.yarn.server.nod import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.HttpResponse; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.junit.Assert; import org.junit.Test; - +import org.mortbay.jetty.HttpHeaders; public class TestShuffleHandler { static final long MiB = 1024 * 1024; private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class); + /** + * Test the validation of ShuffleHandler's meta-data's serialization and + * de-serialization. + * + * @throws Exception exception + */ @Test (timeout = 10000) public void testSerializeMeta() throws Exception { assertEquals(1, ShuffleHandler.deserializeMetaData( @@ -90,6 +100,11 @@ public class TestShuffleHandler { ShuffleHandler.serializeMetaData(8080))); } + /** + * Validate shuffle connection and input/output metrics. + * + * @throws Exception exception + */ @Test (timeout = 10000) public void testShuffleMetrics() throws Exception { MetricsSystem ms = new MetricsSystemImpl(); @@ -120,6 +135,11 @@ public class TestShuffleHandler { assertGauge("ShuffleConnections", connections, rb); } + /** + * Verify client prematurely closing a connection. + * + * @throws Exception exception. + */ @Test (timeout = 10000) public void testClientClosesConnection() throws Exception { final ArrayList<Throwable> failures = new ArrayList<Throwable>(1); @@ -131,13 +151,28 @@ public class TestShuffleHandler { // replace the shuffle handler with one stubbed for testing return new Shuffle(conf) { @Override + protected MapOutputInfo getMapOutputInfo(String base, String mapId, + int reduce, String user) throws IOException { + return null; + } + @Override + protected void populateHeaders(List<String> mapIds, String jobId, + String user, int reduce, HttpRequest request, + HttpResponse response, boolean keepAliveParam, + Map<String, MapOutputInfo> infoMap) throws IOException { + // Only set response headers and skip everything else + // send some dummy value for content-length + super.setResponseHeaders(response, keepAliveParam, 100); + } + @Override protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException { } @Override protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, - Channel ch, String user, String jobId, String mapId, int reduce) + Channel ch, String user, String mapId, int reduce, + MapOutputInfo info) throws IOException { // send a shuffle header and a lot of data down the channel // to trigger a broken pipe @@ -147,7 +182,7 @@ public class TestShuffleHandler { header.write(dob); ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); dob = new DataOutputBuffer(); - for (int i=0; i<100000; ++i) { + for (int i = 0; i < 100000; ++i) { header.write(dob); } return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); @@ -187,6 +222,7 @@ public class TestShuffleHandler { conn.connect(); DataInputStream input = new DataInputStream(conn.getInputStream()); Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + Assert.assertEquals("close", conn.getHeaderField(HttpHeaders.CONNECTION)); ShuffleHeader header = new ShuffleHeader(); header.readFields(input); input.close(); @@ -196,6 +232,147 @@ public class TestShuffleHandler { failures.size() == 0); } + @Test(timeout = 10000) + public void testKeepAlive() throws Exception { + final ArrayList<Throwable> failures = new ArrayList<Throwable>(1); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true); + // try setting to -ve keep alive timeout. + conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + @Override + protected Shuffle getShuffle(final Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected MapOutputInfo getMapOutputInfo(String base, String mapId, + int reduce, String user) throws IOException { + return null; + } + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + } + + @Override + protected void populateHeaders(List<String> mapIds, String jobId, + String user, int reduce, HttpRequest request, + HttpResponse response, boolean keepAliveParam, + Map<String, MapOutputInfo> infoMap) throws IOException { + // Send some dummy data (populate content length details) + ShuffleHeader header = + new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + dob = new DataOutputBuffer(); + for (int i = 0; i < 100000; ++i) { + header.write(dob); + } + + long contentLength = dob.getLength(); + // for testing purpose; + // disable connectinKeepAliveEnabled if keepAliveParam is available + if (keepAliveParam) { + connectionKeepAliveEnabled = false; + } + + super.setResponseHeaders(response, keepAliveParam, contentLength); + } + + @Override + protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, + Channel ch, String user, String mapId, int reduce, + MapOutputInfo info) throws IOException { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + + // send a shuffle header and a lot of data down the channel + // to trigger a broken pipe + ShuffleHeader header = + new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + dob = new DataOutputBuffer(); + for (int i = 0; i < 100000; ++i) { + header.write(dob); + } + return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + } + + @Override + protected void sendError(ChannelHandlerContext ctx, + HttpResponseStatus status) { + if (failures.size() == 0) { + failures.add(new Error()); + ctx.getChannel().close(); + } + } + + @Override + protected void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { + if (failures.size() == 0) { + failures.add(new Error()); + ctx.getChannel().close(); + } + } + }; + } + }; + shuffleHandler.init(conf); + shuffleHandler.start(); + + String shuffleBaseURL = "http://127.0.0.1:" + + shuffleHandler.getConfig().get( + ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); + URL url = + new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&" + + "map=attempt_12345_1_m_1_0"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + DataInputStream input = new DataInputStream(conn.getInputStream()); + Assert.assertEquals(HttpHeaders.KEEP_ALIVE, + conn.getHeaderField(HttpHeaders.CONNECTION)); + Assert.assertEquals("timeout=1", + conn.getHeaderField(HttpHeaders.KEEP_ALIVE)); + Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + ShuffleHeader header = new ShuffleHeader(); + header.readFields(input); + input.close(); + + // For keepAlive via URL + url = + new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&" + + "map=attempt_12345_1_m_1_0&keepAlive=true"); + conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, + ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); + conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, + ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + conn.connect(); + input = new DataInputStream(conn.getInputStream()); + Assert.assertEquals(HttpHeaders.KEEP_ALIVE, + conn.getHeaderField(HttpHeaders.CONNECTION)); + Assert.assertEquals("timeout=1", + conn.getHeaderField(HttpHeaders.KEEP_ALIVE)); + Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + header = new ShuffleHeader(); + header.readFields(input); + input.close(); + } + + /** + * simulate a reducer that sends an invalid shuffle-header - sometimes a wrong + * header_name and sometimes a wrong version + * + * @throws Exception exception + */ @Test (timeout = 10000) public void testIncompatibleShuffleVersion() throws Exception { final int failureNum = 3; @@ -224,7 +401,12 @@ public class TestShuffleHandler { shuffleHandler.stop(); shuffleHandler.close(); } - + + /** + * Validate the limit on number of shuffle connections. + * + * @throws Exception exception + */ @Test (timeout = 10000) public void testMaxConnections() throws Exception { @@ -237,13 +419,28 @@ public class TestShuffleHandler { // replace the shuffle handler with one stubbed for testing return new Shuffle(conf) { @Override + protected MapOutputInfo getMapOutputInfo(String base, String mapId, + int reduce, String user) throws IOException { + // Do nothing. + return null; + } + @Override + protected void populateHeaders(List<String> mapIds, String jobId, + String user, int reduce, HttpRequest request, + HttpResponse response, boolean keepAliveParam, + Map<String, MapOutputInfo> infoMap) throws IOException { + // Do nothing. + } + @Override protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException { + // Do nothing. } @Override protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, - Channel ch, String user, String jobId, String mapId, int reduce) + Channel ch, String user, String mapId, int reduce, + MapOutputInfo info) throws IOException { // send a shuffle header and a lot of data down the channel // to trigger a broken pipe @@ -308,7 +505,13 @@ public class TestShuffleHandler { shuffleHandler.stop(); } - + + /** + * Validate the ownership of the map-output files being pulled in. The + * local-file-system owner of the file should match the user component in the + * + * @throws Exception exception + */ @Test(timeout = 100000) public void testMapFileAccess() throws IOException { // This will run only in NativeIO is enabled as SecureIOUtils need it @@ -323,7 +526,7 @@ public class TestShuffleHandler { TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile(); conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath()); ApplicationId appId = ApplicationId.newInstance(12345, 1); - System.out.println(appId.toString()); + LOG.info(appId.toString()); String appAttemptId = "attempt_12345_1_m_1_0"; String user = "randomUser"; String reducerId = "0"; @@ -341,6 +544,7 @@ public class TestShuffleHandler { protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException { + // Do nothing. } }; @@ -393,7 +597,7 @@ public class TestShuffleHandler { } } - public static void createShuffleHandlerFiles(File logDir, String user, + private static void createShuffleHandlerFiles(File logDir, String user, String appId, String appAttemptId, Configuration conf, List<File> fileMap) throws IOException { String attemptDir = @@ -412,8 +616,8 @@ public class TestShuffleHandler { createMapOutputFile(mapOutputFile, conf); } - public static void - createMapOutputFile(File mapOutputFile, Configuration conf) + private static void + createMapOutputFile(File mapOutputFile, Configuration conf) throws IOException { FileOutputStream out = new FileOutputStream(mapOutputFile); out.write("Creating new dummy map output file. Used only for testing" @@ -422,7 +626,7 @@ public class TestShuffleHandler { out.close(); } - public static void createIndexFile(File indexFile, Configuration conf) + private static void createIndexFile(File indexFile, Configuration conf) throws IOException { if (indexFile.exists()) { System.out.println("Deleting existing file");