Author: tomwhite Date: Tue May 29 14:45:25 2012 New Revision: 1343756 URL: http://svn.apache.org/viewvc?rev=1343756&view=rev Log: Merge -r 1343754:1343755 from trunk to branch-2. Fixes: MAPREDUCE-4146
Added: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java - copied unchanged from r1343755, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1343756&r1=1343755&r2=1343756&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue May 29 14:45:25 2012 @@ -8,6 +8,9 @@ Release 2.0.1-alpha - UNRELEASED IMPROVEMENTS + MAPREDUCE-4146. Support limits on task status string length and number of + block locations in branch-2. (Ahmed Radwan via tomwhite) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1343756&r1=1343755&r2=1343756&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Tue May 29 14:45:25 2012 @@ -53,7 +53,6 @@ import org.apache.hadoop.io.WritableUtil import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.mapred.IFile.Writer; -import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.FileSystemCounter; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskCounter; @@ -569,7 +568,21 @@ abstract public class Task implements Wr resourceCalculator.getProcResourceValues().getCumulativeCpuTime(); } } - + + public static String normalizeStatus(String status, Configuration conf) { + // Check to see if the status string is too long + // and truncate it if needed. + int progressStatusLength = conf.getInt( + MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY, + MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT); + if (status.length() > progressStatusLength) { + LOG.warn("Task status: \"" + status + "\" truncated to max limit (" + + progressStatusLength + " characters)"); + status = status.substring(0, progressStatusLength); + } + return status; + } + @InterfaceAudience.Private @InterfaceStability.Unstable protected class TaskReporter @@ -603,7 +616,7 @@ abstract public class Task implements Wr return progressFlag.getAndSet(false); } public void setStatus(String status) { - taskProgress.setStatus(status); + taskProgress.setStatus(normalizeStatus(status, conf)); // indicate that progress update needs to be sent setProgressFlag(); } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1343756&r1=1343755&r2=1343756&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Tue May 29 14:45:25 2012 @@ -71,4 +71,12 @@ public interface MRConfig { public static final String TASK_LOCAL_OUTPUT_CLASS = "mapreduce.task.local.output.class"; + + public static final String PROGRESS_STATUS_LEN_LIMIT_KEY = + "mapreduce.task.max.status.length"; + public static final int PROGRESS_STATUS_LEN_LIMIT_DEFAULT = 512; + + public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10; + public static final String MAX_BLOCK_LOCATIONS_KEY = + "mapreduce.job.max.split.locations"; } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java?rev=1343756&r1=1343755&r2=1343756&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java Tue May 29 14:45:25 2012 @@ -34,6 +34,7 @@ import org.apache.hadoop.io.serializer.S import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobSubmissionFiles; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -48,6 +49,7 @@ public class JobSplitWriter { private static final int splitVersion = JobSplit.META_SPLIT_VERSION; private static final byte[] SPLIT_FILE_HEADER; + static { try { SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8"); @@ -82,7 +84,7 @@ public class JobSplitWriter { throws IOException { FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf); - SplitMetaInfo[] info = writeOldSplits(splits, out); + SplitMetaInfo[] info = writeOldSplits(splits, out, conf); out.close(); writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, @@ -114,6 +116,8 @@ public class JobSplitWriter { if (array.length != 0) { SerializationFactory factory = new SerializationFactory(conf); int i = 0; + int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, + MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT); long offset = out.getPos(); for(T split: array) { long prevCount = out.getPos(); @@ -123,9 +127,15 @@ public class JobSplitWriter { serializer.open(out); serializer.serialize(split); long currCount = out.getPos(); + String[] locations = split.getLocations(); + if (locations.length > maxBlockLocations) { + throw new IOException("Max block location exceeded for split: " + + split + " splitsize: " + locations.length + + " maxsize: " + maxBlockLocations); + } info[i++] = new JobSplit.SplitMetaInfo( - split.getLocations(), offset, + locations, offset, split.getLength()); offset += currCount - prevCount; } @@ -135,18 +145,26 @@ public class JobSplitWriter { private static SplitMetaInfo[] writeOldSplits( org.apache.hadoop.mapred.InputSplit[] splits, - FSDataOutputStream out) throws IOException { + FSDataOutputStream out, Configuration conf) throws IOException { SplitMetaInfo[] info = new SplitMetaInfo[splits.length]; if (splits.length != 0) { int i = 0; long offset = out.getPos(); + int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, + MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT); for(org.apache.hadoop.mapred.InputSplit split: splits) { long prevLen = out.getPos(); Text.writeString(out, split.getClass().getName()); split.write(out); long currLen = out.getPos(); + String[] locations = split.getLocations(); + if (locations.length > maxBlockLocations) { + throw new IOException("Max block location exceeded for split: " + + split + " splitsize: " + locations.length + + " maxsize: " + maxBlockLocations); + } info[i++] = new JobSplit.SplitMetaInfo( - split.getLocations(), offset, + locations, offset, split.getLength()); offset += currLen - prevLen; } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java?rev=1343756&r1=1343755&r2=1343756&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java Tue May 29 14:45:25 2012 @@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.task import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.StatusReporter; @@ -92,8 +93,9 @@ public class TaskAttemptContextImpl exte */ @Override public void setStatus(String status) { - setStatusString(status); - reporter.setStatus(status); + String normalizedStatus = Task.normalizeStatus(status, conf); + setStatusString(normalizedStatus); + reporter.setStatus(normalizedStatus); } public static class DummyReporter extends StatusReporter { Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java?rev=1343756&r1=1343755&r2=1343756&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java Tue May 29 14:45:25 2012 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred; +import java.io.DataOutputStream; import java.io.IOException; import java.util.Iterator; @@ -25,10 +26,15 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; + import static org.junit.Assert.*; /** @@ -98,7 +104,28 @@ public class TestReporter { progressRange, reporter.getProgress(), 0f); } } - + + static class StatusLimitMapper extends + org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> { + + @Override + public void map(LongWritable key, Text value, Context context) + throws IOException { + StringBuilder sb = new StringBuilder(512); + for (int i = 0; i < 1000; i++) { + sb.append("a"); + } + context.setStatus(sb.toString()); + int progressStatusLength = context.getConfiguration().getInt( + MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY, + MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT); + + if (context.getStatus().length() > progressStatusLength) { + throw new IOException("Status is not truncated"); + } + } + } + /** * Test {@link Reporter}'s progress for a map-only job. * This will make sure that only the map phase decides the attempt's progress. @@ -166,7 +193,6 @@ public class TestReporter { /** * Test {@link Reporter}'s progress for map-reduce job. */ - @SuppressWarnings("deprecation") @Test public void testReporterProgressForMRJob() throws IOException { Path test = new Path(testRootTempDir, "testReporterProgressForMRJob"); @@ -186,4 +212,39 @@ public class TestReporter { assertTrue("Job failed", job.isSuccessful()); } + + @Test + public void testStatusLimit() throws IOException, InterruptedException, + ClassNotFoundException { + Path test = new Path(testRootTempDir, "testStatusLimit"); + + Configuration conf = new Configuration(); + Path inDir = new Path(test, "in"); + Path outDir = new Path(test, "out"); + FileSystem fs = FileSystem.get(conf); + if (fs.exists(inDir)) { + fs.delete(inDir, true); + } + fs.mkdirs(inDir); + DataOutputStream file = fs.create(new Path(inDir, "part-" + 0)); + file.writeBytes("testStatusLimit"); + file.close(); + + if (fs.exists(outDir)) { + fs.delete(outDir, true); + } + + Job job = Job.getInstance(conf, "testStatusLimit"); + + job.setMapperClass(StatusLimitMapper.class); + job.setNumReduceTasks(0); + + FileInputFormat.addInputPath(job, inDir); + FileOutputFormat.setOutputPath(job, outDir); + + job.waitForCompletion(true); + + assertTrue("Job failed", job.isSuccessful()); + } + } \ No newline at end of file