Repository: hadoop Updated Branches: refs/heads/branch-2.7 431f48f65 -> 7b783911c
MAPREDUCE-6451. DistCp has incorrect chunkFilePath for multiple jobs when strategy is dynamic. Contributed by Kuhu Shukla. (cherry picked from commit 2868ca0328d908056745223fb38d9a90fd2811ba) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7b783911 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7b783911 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7b783911 Branch: refs/heads/branch-2.7 Commit: 7b783911c4062dcc957a1d577ffec56471510d23 Parents: 431f48f Author: Kihwal Lee <kih...@apache.org> Authored: Fri Oct 30 14:58:54 2015 -0500 Committer: Kihwal Lee <kih...@apache.org> Committed: Fri Oct 30 14:58:54 2015 -0500 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../tools/mapred/lib/DynamicInputChunk.java | 137 +++---------------- .../tools/mapred/lib/DynamicInputFormat.java | 31 +++-- .../tools/mapred/lib/DynamicRecordReader.java | 13 +- .../org/apache/hadoop/tools/StubContext.java | 4 + .../mapred/lib/TestDynamicInputFormat.java | 33 ++++- 6 files changed, 83 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b783911/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9aac41a..7f61e4d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -55,6 +55,9 @@ Release 2.7.2 - UNRELEASED MAPREDUCE-6528. Memory leak for HistoryFileManager.getJobSummary() (Junping Du via jlowe) + MAPREDUCE-6451. DistCp has incorrect chunkFilePath for multiple jobs when + strategy is dynamic (Kuhu Shukla via kihwal) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b783911/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java index 8482e7d..9bf8e47 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputChunk.java @@ -20,14 +20,10 @@ package org.apache.hadoop.tools.mapred.lib; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.tools.CopyListingFileStatus; -import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader; import org.apache.hadoop.mapreduce.lib.input.FileSplit; @@ -47,72 +43,28 @@ import java.io.IOException; */ class DynamicInputChunk<K, V> { private static Log LOG = LogFactory.getLog(DynamicInputChunk.class); - - private static Configuration configuration; - private static Path chunkRootPath; - private static String chunkFilePrefix; - private static int numChunksLeft = -1; // Un-initialized before 1st dir-scan. - private static FileSystem fs; - private Path chunkFilePath; private SequenceFileRecordReader<K, V> reader; private SequenceFile.Writer writer; + private DynamicInputChunkContext chunkContext; - private static void initializeChunkInvariants(Configuration config) - throws IOException { - configuration = config; - Path listingFilePath = new Path(getListingFilePath(configuration)); - chunkRootPath = new Path(listingFilePath.getParent(), "chunkDir"); - fs = chunkRootPath.getFileSystem(configuration); - chunkFilePrefix = listingFilePath.getName() + ".chunk."; - } - - private static String getListingFilePath(Configuration configuration) { - final String listingFileString = configuration.get( - DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, ""); - assert !listingFileString.equals("") : "Listing file not found."; - return listingFileString; - } - - private static boolean areInvariantsInitialized() { - return chunkRootPath != null; - } - - private DynamicInputChunk(String chunkId, Configuration configuration) + DynamicInputChunk(String chunkId, DynamicInputChunkContext chunkContext) throws IOException { - if (!areInvariantsInitialized()) - initializeChunkInvariants(configuration); - - chunkFilePath = new Path(chunkRootPath, chunkFilePrefix + chunkId); + this.chunkContext = chunkContext; + chunkFilePath = new Path(chunkContext.getChunkRootPath(), + chunkContext.getChunkFilePrefix() + chunkId); openForWrite(); } - private void openForWrite() throws IOException { writer = SequenceFile.createWriter( - chunkFilePath.getFileSystem(configuration), configuration, + chunkContext.getFs(), chunkContext.getConfiguration(), chunkFilePath, Text.class, CopyListingFileStatus.class, SequenceFile.CompressionType.NONE); } /** - * Factory method to create chunk-files for writing to. - * (For instance, when the DynamicInputFormat splits the input-file into - * chunks.) - * @param chunkId String to identify the chunk. - * @param configuration Configuration, describing the location of the listing- - * file, file-system for the map-job, etc. - * @return A DynamicInputChunk, corresponding to a chunk-file, with the name - * incorporating the chunk-id. - * @throws IOException Exception on failure to create the chunk. - */ - public static DynamicInputChunk createChunkForWrite(String chunkId, - Configuration configuration) throws IOException { - return new DynamicInputChunk(chunkId, configuration); - } - - /** * Method to write records into a chunk. * @param key Key from the listing file. * @param value Corresponding value from the listing file. @@ -135,19 +87,19 @@ class DynamicInputChunk<K, V> { * @throws IOException Exception on failure to reassign. */ public void assignTo(TaskID taskId) throws IOException { - Path newPath = new Path(chunkRootPath, taskId.toString()); - if (!fs.rename(chunkFilePath, newPath)) { + Path newPath = new Path(chunkContext.getChunkRootPath(), taskId.toString()); + if (!chunkContext.getFs().rename(chunkFilePath, newPath)) { LOG.warn(chunkFilePath + " could not be assigned to " + taskId); } } - private DynamicInputChunk(Path chunkFilePath, - TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - if (!areInvariantsInitialized()) - initializeChunkInvariants(taskAttemptContext.getConfiguration()); + public DynamicInputChunk(Path chunkFilePath, + TaskAttemptContext taskAttemptContext, + DynamicInputChunkContext chunkContext) throws IOException, + InterruptedException { this.chunkFilePath = chunkFilePath; + this.chunkContext = chunkContext; openForRead(taskAttemptContext); } @@ -155,45 +107,8 @@ class DynamicInputChunk<K, V> { throws IOException, InterruptedException { reader = new SequenceFileRecordReader<K, V>(); reader.initialize(new FileSplit(chunkFilePath, 0, - DistCpUtils.getFileSize(chunkFilePath, configuration), null), - taskAttemptContext); - } - - /** - * Factory method that - * 1. acquires a chunk for the specified map-task attempt - * 2. returns a DynamicInputChunk associated with the acquired chunk-file. - * @param taskAttemptContext The attempt-context for the map task that's - * trying to acquire a chunk. - * @return The acquired dynamic-chunk. The chunk-file is renamed to the - * attempt-id (from the attempt-context.) - * @throws IOException Exception on failure. - * @throws InterruptedException Exception on failure. - */ - public static DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - if (!areInvariantsInitialized()) - initializeChunkInvariants(taskAttemptContext.getConfiguration()); - - String taskId - = taskAttemptContext.getTaskAttemptID().getTaskID().toString(); - Path acquiredFilePath = new Path(chunkRootPath, taskId); - - if (fs.exists(acquiredFilePath)) { - LOG.info("Acquiring pre-assigned chunk: " + acquiredFilePath); - return new DynamicInputChunk(acquiredFilePath, taskAttemptContext); - } - - for (FileStatus chunkFile : getListOfChunkFiles()) { - if (fs.rename(chunkFile.getPath(), acquiredFilePath)) { - LOG.info(taskId + " acquired " + chunkFile.getPath()); - return new DynamicInputChunk(acquiredFilePath, taskAttemptContext); - } - else - LOG.warn(taskId + " could not acquire " + chunkFile.getPath()); - } - - return null; + DistCpUtils.getFileSize(chunkFilePath, + chunkContext.getConfiguration()), null), taskAttemptContext); } /** @@ -204,19 +119,13 @@ class DynamicInputChunk<K, V> { */ public void release() throws IOException { close(); - if (!fs.delete(chunkFilePath, false)) { + if (!chunkContext.getFs().delete(chunkFilePath, false)) { LOG.error("Unable to release chunk at path: " + chunkFilePath); - throw new IOException("Unable to release chunk at path: " + chunkFilePath); + throw new IOException("Unable to release chunk at path: " + + chunkFilePath); } } - static FileStatus [] getListOfChunkFiles() throws IOException { - Path chunkFilePattern = new Path(chunkRootPath, chunkFilePrefix + "*"); - FileStatus chunkFiles[] = fs.globStatus(chunkFilePattern); - numChunksLeft = chunkFiles.length; - return chunkFiles; - } - /** * Getter for the chunk-file's path, on HDFS. * @return The qualified path to the chunk-file. @@ -234,14 +143,4 @@ class DynamicInputChunk<K, V> { return reader; } - /** - * Getter for the number of chunk-files left in the chunk-file directory. - * Useful to determine how many chunks (and hence, records) are left to be - * processed. - * @return Before the first scan of the directory, the number returned is -1. - * Otherwise, the number of chunk-files seen from the last scan is returned. - */ - public static int getNumChunksLeft() { - return numChunksLeft; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b783911/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java index 38269c7..fe8604a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicInputFormat.java @@ -57,7 +57,8 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> { = "mapred.num.splits"; private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK = "mapred.num.entries.per.chunk"; - + private DynamicInputChunkContext<K, V> chunkContext = null; + /** * Implementation of InputFormat::getSplits(). This method splits up the * copy-listing file into chunks, and assigns the first batch to different @@ -72,6 +73,7 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> { throws IOException, InterruptedException { LOG.info("DynamicInputFormat: Getting splits for job:" + jobContext.getJobID()); + chunkContext = getChunkContext(jobContext.getConfiguration()); return createSplits(jobContext, splitCopyListingIntoChunksWithShuffle(jobContext)); } @@ -101,6 +103,13 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> { private static int N_CHUNKS_OPEN_AT_ONCE_DEFAULT = 16; + public DynamicInputChunkContext<K, V> getChunkContext( + Configuration configuration) throws IOException { + if(chunkContext == null) { + chunkContext = new DynamicInputChunkContext<K, V>(configuration); + } + return chunkContext; + } private List<DynamicInputChunk> splitCopyListingIntoChunksWithShuffle (JobContext context) throws IOException { @@ -146,8 +155,8 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> { closeAll(openChunks); chunksFinal.addAll(openChunks); - openChunks = createChunks( - configuration, chunkCount, nChunksTotal, nChunksOpenAtOnce); + openChunks = createChunks(chunkCount, nChunksTotal, + nChunksOpenAtOnce); chunkCount += openChunks.size(); @@ -183,9 +192,9 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> { chunk.close(); } - private static List<DynamicInputChunk> createChunks(Configuration config, - int chunkCount, int nChunksTotal, int nChunksOpenAtOnce) - throws IOException { + private List<DynamicInputChunk> createChunks(int chunkCount, + int nChunksTotal, int nChunksOpenAtOnce) + throws IOException { List<DynamicInputChunk> chunks = new ArrayList<DynamicInputChunk>(); int chunkIdUpperBound = Math.min(nChunksTotal, chunkCount + nChunksOpenAtOnce); @@ -197,14 +206,13 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> { chunkIdUpperBound = nChunksTotal; for (int i=chunkCount; i < chunkIdUpperBound; ++i) - chunks.add(createChunk(i, config)); + chunks.add(createChunk(i)); return chunks; } - private static DynamicInputChunk createChunk(int chunkId, Configuration config) + private DynamicInputChunk createChunk(int chunkId) throws IOException { - return DynamicInputChunk.createChunkForWrite(String.format("%05d", chunkId), - config); + return chunkContext.createChunkForWrite(String.format("%05d", chunkId)); } @@ -351,6 +359,7 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> { InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - return new DynamicRecordReader<K, V>(); + chunkContext = getChunkContext(taskAttemptContext.getConfiguration()); + return new DynamicRecordReader<K, V>(chunkContext); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b783911/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java index 00b3c69..87b8f08 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/lib/DynamicRecordReader.java @@ -49,9 +49,14 @@ public class DynamicRecordReader<K, V> extends RecordReader<K, V> { private int numRecordsProcessedByThisMap = 0; private long timeOfLastChunkDirScan = 0; private boolean isChunkDirAlreadyScanned = false; + private DynamicInputChunkContext<K, V> chunkContext; private static long TIME_THRESHOLD_FOR_DIR_SCANS = TimeUnit.MINUTES.toMillis(5); + DynamicRecordReader(DynamicInputChunkContext<K, V> chunkContext) { + this.chunkContext = chunkContext; + } + /** * Implementation for RecordReader::initialize(). Initializes the internal * RecordReader to read from chunks. @@ -69,7 +74,7 @@ public class DynamicRecordReader<K, V> extends RecordReader<K, V> { this.taskAttemptContext = taskAttemptContext; configuration = taskAttemptContext.getConfiguration(); taskId = taskAttemptContext.getTaskAttemptID().getTaskID(); - chunk = DynamicInputChunk.acquire(this.taskAttemptContext); + chunk = chunkContext.acquire(this.taskAttemptContext); timeOfLastChunkDirScan = System.currentTimeMillis(); isChunkDirAlreadyScanned = false; @@ -114,7 +119,7 @@ public class DynamicRecordReader<K, V> extends RecordReader<K, V> { timeOfLastChunkDirScan = System.currentTimeMillis(); isChunkDirAlreadyScanned = false; - chunk = DynamicInputChunk.acquire(taskAttemptContext); + chunk = chunkContext.acquire(taskAttemptContext); if (chunk == null) return false; @@ -182,12 +187,12 @@ public class DynamicRecordReader<K, V> extends RecordReader<K, V> { || (!isChunkDirAlreadyScanned && numRecordsProcessedByThisMap%numRecordsPerChunk > numRecordsPerChunk/2)) { - DynamicInputChunk.getListOfChunkFiles(); + chunkContext.getListOfChunkFiles(); isChunkDirAlreadyScanned = true; timeOfLastChunkDirScan = now; } - return DynamicInputChunk.getNumChunksLeft(); + return chunkContext.getNumChunksLeft(); } /** * Implementation of RecordReader::close(). http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b783911/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java index 1a2227c..4bb6c98 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/StubContext.java @@ -64,6 +64,10 @@ public class StubContext { return reader; } + public void setReader(RecordReader<Text, CopyListingFileStatus> reader) { + this.reader = reader; + } + public StubInMemoryWriter getWriter() { return writer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b783911/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java index 8cc8317..bb2dd9d 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/lib/TestDynamicInputFormat.java @@ -40,6 +40,7 @@ import org.junit.BeforeClass; import org.junit.Test; import java.io.DataOutputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -126,13 +127,14 @@ public class TestDynamicInputFormat { int taskId = 0; for (InputSplit split : splits) { - RecordReader<Text, CopyListingFileStatus> recordReader = - inputFormat.createRecordReader(split, null); StubContext stubContext = new StubContext(jobContext.getConfiguration(), - recordReader, taskId); + null, taskId); final TaskAttemptContext taskAttemptContext = stubContext.getContext(); - + + RecordReader<Text, CopyListingFileStatus> recordReader = + inputFormat.createRecordReader(split, taskAttemptContext); + stubContext.setReader(recordReader); recordReader.initialize(splits.get(0), taskAttemptContext); float previousProgressValue = 0f; while (recordReader.nextKeyValue()) { @@ -182,4 +184,27 @@ public class TestDynamicInputFormat { conf.setInt(DistCpConstants.CONF_LABEL_SPLIT_RATIO, 53); Assert.assertEquals(53, DynamicInputFormat.getSplitRatio(3, 200, conf)); } + + @Test + public void testDynamicInputChunkContext() throws IOException { + Configuration configuration = new Configuration(); + configuration.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, + "/tmp/test/file1.seq"); + DynamicInputFormat firstInputFormat = new DynamicInputFormat(); + DynamicInputFormat secondInputFormat = new DynamicInputFormat(); + DynamicInputChunkContext firstContext = + firstInputFormat.getChunkContext(configuration); + DynamicInputChunkContext secondContext = + firstInputFormat.getChunkContext(configuration); + DynamicInputChunkContext thirdContext = + secondInputFormat.getChunkContext(configuration); + DynamicInputChunkContext fourthContext = + secondInputFormat.getChunkContext(configuration); + Assert.assertTrue("Chunk contexts from the same DynamicInputFormat " + + "object should be the same.",firstContext.equals(secondContext)); + Assert.assertTrue("Chunk contexts from the same DynamicInputFormat " + + "object should be the same.",thirdContext.equals(fourthContext)); + Assert.assertTrue("Contexts from different DynamicInputFormat " + + "objects should be different.",!firstContext.equals(thirdContext)); + } }