Author: wang Date: Wed Aug 20 18:39:03 2014 New Revision: 1619197 URL: http://svn.apache.org/r1619197 Log: HDFS-6134 and HADOOP-10150 subtasks. Merge fs-encryption branch to trunk.
Added: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES-fs-encryption.txt - copied unchanged from r1619194, hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES-fs-encryption.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java - copied unchanged from r1619194, hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java - copied unchanged from r1619194, hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/ (props changed) hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (props changed) hadoop/common/trunk/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed) hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java Propchange: hadoop/common/trunk/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project:r1594376-1619194 Propchange: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/CHANGES.txt:r1594376-1619194 Propchange: hadoop/common/trunk/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/conf:r1594376-1619194 Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java Wed Aug 20 18:39:03 2014 @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapred.Merger.S import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.CryptoUtils; /** * <code>BackupStore</code> is an utility class that is used to support @@ -572,7 +574,9 @@ public class BackupStore<K,V> { file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(), -1, conf); - return new Writer<K, V>(conf, fs, file); + FSDataOutputStream out = fs.create(file); + out = CryptoUtils.wrapIfNecessary(conf, out); + return new Writer<K, V>(conf, out, null, null, null, null, true); } } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java Wed Aug 20 18:39:03 2014 @@ -90,13 +90,11 @@ public class IFile { DataOutputBuffer buffer = new DataOutputBuffer(); - public Writer(Configuration conf, FileSystem fs, Path file, - Class<K> keyClass, Class<V> valueClass, - CompressionCodec codec, - Counters.Counter writesCounter) throws IOException { - this(conf, fs.create(file), keyClass, valueClass, codec, - writesCounter); - ownOutputStream = true; + public Writer(Configuration conf, FSDataOutputStream out, + Class<K> keyClass, Class<V> valueClass, + CompressionCodec codec, Counters.Counter writesCounter) + throws IOException { + this(conf, out, keyClass, valueClass, codec, writesCounter, false); } protected Writer(Counters.Counter writesCounter) { @@ -105,7 +103,8 @@ public class IFile { public Writer(Configuration conf, FSDataOutputStream out, Class<K> keyClass, Class<V> valueClass, - CompressionCodec codec, Counters.Counter writesCounter) + CompressionCodec codec, Counters.Counter writesCounter, + boolean ownOutputStream) throws IOException { this.writtenRecordsCounter = writesCounter; this.checksumOut = new IFileOutputStream(out); @@ -137,11 +136,7 @@ public class IFile { this.valueSerializer = serializationFactory.getSerializer(valueClass); this.valueSerializer.open(buffer); } - } - - public Writer(Configuration conf, FileSystem fs, Path file) - throws IOException { - this(conf, fs, file, null, null, null, null); + this.ownOutputStream = ownOutputStream; } public void close() throws IOException { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Wed Aug 20 18:39:03 2014 @@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.lib.m import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; import org.apache.hadoop.mapreduce.task.MapContextImpl; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.IndexedSorter; import org.apache.hadoop.util.Progress; @@ -1580,7 +1581,8 @@ public class MapTask extends Task { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); - writer = new Writer<K, V>(job, out, keyClass, valClass, codec, + FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); + writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { // spill directly @@ -1617,8 +1619,8 @@ public class MapTask extends Task { // record offsets rec.startOffset = segmentStart; - rec.rawLength = writer.getRawLength(); - rec.partLength = writer.getCompressedLength(); + rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); + rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); spillRec.putIndex(rec, i); writer = null; @@ -1668,7 +1670,8 @@ public class MapTask extends Task { try { long segmentStart = out.getPos(); // Create a new codec, don't care! - writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec, + FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); + writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (i == partition) { @@ -1682,8 +1685,8 @@ public class MapTask extends Task { // record offsets rec.startOffset = segmentStart; - rec.rawLength = writer.getRawLength(); - rec.partLength = writer.getCompressedLength(); + rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); + rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); spillRec.putIndex(rec, i); writer = null; @@ -1825,12 +1828,13 @@ public class MapTask extends Task { try { for (int i = 0; i < partitions; i++) { long segmentStart = finalOut.getPos(); + FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut); Writer<K, V> writer = - new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null); + new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null); writer.close(); rec.startOffset = segmentStart; - rec.rawLength = writer.getRawLength(); - rec.partLength = writer.getCompressedLength(); + rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); + rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); sr.putIndex(rec, i); } sr.writeToFile(finalIndexFile, job); @@ -1879,8 +1883,9 @@ public class MapTask extends Task { //write merged output to disk long segmentStart = finalOut.getPos(); + FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut); Writer<K, V> writer = - new Writer<K, V>(job, finalOut, keyClass, valClass, codec, + new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); @@ -1896,8 +1901,8 @@ public class MapTask extends Task { // record offsets rec.startOffset = segmentStart; - rec.rawLength = writer.getRawLength(); - rec.partLength = writer.getCompressedLength(); + rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); + rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); spillRec.putIndex(rec, parts); } spillRec.writeToFile(finalIndexFile, job); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Wed Aug 20 18:39:03 2014 @@ -30,6 +30,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.ChecksumFileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; @@ -40,6 +41,7 @@ import org.apache.hadoop.mapred.IFile.Re import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.util.PriorityQueue; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; @@ -298,8 +300,12 @@ public class Merger { void init(Counters.Counter readsCounter) throws IOException { if (reader == null) { FSDataInputStream in = fs.open(file); + in.seek(segmentOffset); - reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter); + in = CryptoUtils.wrapIfNecessary(conf, in); + reader = new Reader<K, V>(conf, in, + segmentLength - CryptoUtils.cryptoPadding(conf), + codec, readsCounter); } if (mapOutputsCounter != null) { @@ -714,9 +720,10 @@ public class Merger { tmpFilename.toString(), approxOutputSize, conf); - Writer<K, V> writer = - new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec, - writesCounter); + FSDataOutputStream out = fs.create(outputFile); + out = CryptoUtils.wrapIfNecessary(conf, out); + Writer<K, V> writer = new Writer<K, V>(conf, out, keyClass, valueClass, + codec, writesCounter, true); writeFile(this, writer, reporter, conf); writer.close(); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java Wed Aug 20 18:39:03 2014 @@ -291,7 +291,7 @@ class JobSubmitter { /** * configure the jobconf of the user with the command line options of * -libjars, -files, -archives. - * @param conf + * @param job * @throws IOException */ private void copyAndConfigureFiles(Job job, Path jobSubmitDir) @@ -376,8 +376,13 @@ class JobSubmitter { if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { + + int keyLen = CryptoUtils.isShuffleEncrypted(conf) + ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, + MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS) + : SHUFFLE_KEY_LENGTH; keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); - keyGen.init(SHUFFLE_KEY_LENGTH); + keyGen.init(keyLen); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Wed Aug 20 18:39:03 2014 @@ -771,4 +771,18 @@ public interface MRJobConfig { public static final String TASK_PREEMPTION = "mapreduce.job.preemption"; + + public static final String MR_ENCRYPTED_INTERMEDIATE_DATA = + "mapreduce.job.encrypted-intermediate-data"; + public static final boolean DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA = false; + + public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS = + "mapreduce.job.encrypted-intermediate-data-key-size-bits"; + public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS = + 128; + + public static final String MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB = + "mapreduce.job.encrypted-intermediate-data.buffer.kb"; + public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB = + 128; } Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Wed Aug 20 18:39:03 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.task import java.io.DataInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.ConnectException; import java.net.HttpURLConnection; import java.net.MalformedURLException; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.MRCon import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.security.ssl.SSLFactory; import com.google.common.annotations.VisibleForTesting; @@ -65,6 +67,7 @@ class Fetcher<K,V> extends Thread { CONNECTION, WRONG_REDUCE} private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors"; + private final JobConf jobConf; private final Counters.Counter connectionErrs; private final Counters.Counter ioErrs; private final Counters.Counter wrongLengthErrs; @@ -104,6 +107,7 @@ class Fetcher<K,V> extends Thread { Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey shuffleKey, int id) { + this.jobConf = job; this.reporter = reporter; this.scheduler = scheduler; this.merger = merger; @@ -396,7 +400,11 @@ class Fetcher<K,V> extends Thread { return remaining.toArray(new TaskAttemptID[remaining.size()]); } - + InputStream is = input; + is = CryptoUtils.wrapIfNecessary(jobConf, is, compressedLength); + compressedLength -= CryptoUtils.cryptoPadding(jobConf); + decompressedLength -= CryptoUtils.cryptoPadding(jobConf); + // Do some basic sanity verification if (!verifySanity(compressedLength, decompressedLength, forReduce, remaining, mapId)) { @@ -433,7 +441,7 @@ class Fetcher<K,V> extends Thread { LOG.info("fetcher#" + id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getDescription()); - mapOutput.shuffle(host, input, compressedLength, decompressedLength, + mapOutput.shuffle(host, is, compressedLength, decompressedLength, metrics, reporter); } catch (java.lang.InternalError e) { LOG.warn("Failed to shuffle for fetcher#"+id, e); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java Wed Aug 20 18:39:03 2014 @@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.MapOutpu import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SpillRecord; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.CryptoUtils; /** * LocalFetcher is used by LocalJobRunner to perform a local filesystem @@ -145,6 +146,9 @@ class LocalFetcher<K,V> extends Fetcher< // now read the file, seek to the appropriate section, and send it. FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); + + inStream = CryptoUtils.wrapIfNecessary(job, inStream); + try { inStream.seek(ir.startOffset); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Wed Aug 20 18:39:03 2014 @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumFileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalFileSystem; @@ -54,6 +55,7 @@ import org.apache.hadoop.mapred.Task.Com import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.task.reduce.MapOutput.MapOutputComparator; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.ReflectionUtils; @@ -228,6 +230,10 @@ public class MergeManagerImpl<K, V> impl return new InMemoryMerger(this); } + protected MergeThread<CompressAwarePath,K,V> createOnDiskMerger() { + return new OnDiskMerger(this); + } + TaskAttemptID getReduceId() { return reduceId; } @@ -453,11 +459,10 @@ public class MergeManagerImpl<K, V> impl mergeOutputSize).suffix( Task.MERGED_OUTPUT_PREFIX); - Writer<K,V> writer = - new Writer<K,V>(jobConf, rfs, outputPath, - (Class<K>) jobConf.getMapOutputKeyClass(), - (Class<V>) jobConf.getMapOutputValueClass(), - codec, null); + FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); + Writer<K, V> writer = new Writer<K, V>(jobConf, out, + (Class<K>) jobConf.getMapOutputKeyClass(), + (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true); RawKeyValueIterator rIter = null; CompressAwarePath compressAwarePath; @@ -537,11 +542,12 @@ public class MergeManagerImpl<K, V> impl Path outputPath = localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX); - Writer<K,V> writer = - new Writer<K,V>(jobConf, rfs, outputPath, - (Class<K>) jobConf.getMapOutputKeyClass(), - (Class<V>) jobConf.getMapOutputValueClass(), - codec, null); + + FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); + Writer<K, V> writer = new Writer<K, V>(jobConf, out, + (Class<K>) jobConf.getMapOutputKeyClass(), + (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true); + RawKeyValueIterator iter = null; CompressAwarePath compressAwarePath; Path tmpDir = new Path(reduceId.toString()); @@ -717,8 +723,10 @@ public class MergeManagerImpl<K, V> impl keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, reporter, spilledRecordsCounter, null, mergePhase); - Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath, - keyClass, valueClass, codec, null); + + FSDataOutputStream out = CryptoUtils.wrapIfNecessary(job, fs.create(outputPath)); + Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass, + codec, null, true); try { Merger.writeFile(rIter, writer, reporter, job); writer.close(); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java Wed Aug 20 18:39:03 2014 @@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; import com.google.common.annotations.VisibleForTesting; @@ -75,7 +76,7 @@ class OnDiskMapOutput<K, V> extends MapO this.merger = merger; this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); - disk = fs.create(tmpOutputPath); + disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); } @VisibleForTesting Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/branches/fs-encryption/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1594376-1619194 Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/DistCp.md.vm Wed Aug 20 18:39:03 2014 @@ -191,6 +191,26 @@ $H3 Update and Overwrite If `-update` is used, `1` is overwritten as well. +$H3 raw Namespace Extended Attribute Preservation + + This section only applies to HDFS. + + If the target and all of the source pathnames are in the /.reserved/raw + hierarchy, then 'raw' namespace extended attributes will be preserved. + 'raw' xattrs are used by the system for internal functions such as encryption + meta data. They are only visible to users when accessed through the + /.reserved/raw hierarchy. + + raw xattrs are preserved based solely on whether /.reserved/raw prefixes are + supplied. The -p (preserve, see below) flag does not impact preservation of + raw xattrs. + + To prevent raw xattrs from being preserved, simply do not use the + /.reserved/raw prefix on any of the source and target paths. + + If the /.reserved/raw prefix is specified on only a subset of the source and + target paths, an error will be displayed and a non-0 exit code returned. + Command Line Options -------------------- Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java Wed Aug 20 18:39:03 2014 @@ -24,14 +24,16 @@ import static org.mockito.Mockito.doAnsw import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.apache.hadoop.fs.FSDataInputStream; import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -51,10 +53,16 @@ import org.apache.hadoop.mapred.RawKeyVa import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl; +import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; import org.junit.After; @@ -63,40 +71,48 @@ import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.collect.Lists; + public class TestMerger { private Configuration conf; private JobConf jobConf; private FileSystem fs; - + @Before public void setup() throws IOException { conf = new Configuration(); jobConf = new JobConf(); fs = FileSystem.getLocal(conf); } - - @After - public void cleanup() throws IOException { - fs.delete(new Path(jobConf.getLocalDirs()[0]), true); + + + @Test + public void testEncryptedMerger() throws Throwable { + jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); + TokenCache.setShuffleSecretKey(new byte[16], credentials); + UserGroupInformation.getCurrentUser().addCredentials(credentials); + testInMemoryAndOnDiskMerger(); } - + @Test - public void testInMemoryMerger() throws Throwable { + public void testInMemoryAndOnDiskMerger() throws Throwable { JobID jobId = new JobID("a", 0); - TaskAttemptID reduceId = new TaskAttemptID( + TaskAttemptID reduceId1 = new TaskAttemptID( new TaskID(jobId, TaskType.REDUCE, 0), 0); TaskAttemptID mapId1 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 1), 0); TaskAttemptID mapId2 = new TaskAttemptID( new TaskID(jobId, TaskType.MAP, 2), 0); - + LocalDirAllocator lda = new LocalDirAllocator(MRConfig.LOCAL_DIR); - + MergeManagerImpl<Text, Text> mergeManager = new MergeManagerImpl<Text, Text>( - reduceId, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, + reduceId1, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, null, null, new Progress(), new MROutputFiles()); - + // write map outputs Map<String, String> map1 = new TreeMap<String, String>(); map1.put("apple", "disgusting"); @@ -113,32 +129,88 @@ public class TestMerger { mapOutputBytes1.length); System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0, mapOutputBytes2.length); - + // create merger and run merge MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger = mergeManager.createInMemoryMerger(); - List<InMemoryMapOutput<Text, Text>> mapOutputs = + List<InMemoryMapOutput<Text, Text>> mapOutputs1 = new ArrayList<InMemoryMapOutput<Text, Text>>(); - mapOutputs.add(mapOutput1); - mapOutputs.add(mapOutput2); - - inMemoryMerger.merge(mapOutputs); - + mapOutputs1.add(mapOutput1); + mapOutputs1.add(mapOutput2); + + inMemoryMerger.merge(mapOutputs1); + Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); - Path outPath = mergeManager.onDiskMapOutputs.iterator().next(); - + + TaskAttemptID reduceId2 = new TaskAttemptID( + new TaskID(jobId, TaskType.REDUCE, 3), 0); + TaskAttemptID mapId3 = new TaskAttemptID( + new TaskID(jobId, TaskType.MAP, 4), 0); + TaskAttemptID mapId4 = new TaskAttemptID( + new TaskID(jobId, TaskType.MAP, 5), 0); + // write map outputs + Map<String, String> map3 = new TreeMap<String, String>(); + map3.put("apple", "awesome"); + map3.put("carrot", "amazing"); + Map<String, String> map4 = new TreeMap<String, String>(); + map4.put("banana", "bla"); + byte[] mapOutputBytes3 = writeMapOutput(conf, map3); + byte[] mapOutputBytes4 = writeMapOutput(conf, map4); + InMemoryMapOutput<Text, Text> mapOutput3 = new InMemoryMapOutput<Text, Text>( + conf, mapId3, mergeManager, mapOutputBytes3.length, null, true); + InMemoryMapOutput<Text, Text> mapOutput4 = new InMemoryMapOutput<Text, Text>( + conf, mapId4, mergeManager, mapOutputBytes4.length, null, true); + System.arraycopy(mapOutputBytes3, 0, mapOutput3.getMemory(), 0, + mapOutputBytes3.length); + System.arraycopy(mapOutputBytes4, 0, mapOutput4.getMemory(), 0, + mapOutputBytes4.length); + +// // create merger and run merge + MergeThread<InMemoryMapOutput<Text, Text>, Text, Text> inMemoryMerger2 = + mergeManager.createInMemoryMerger(); + List<InMemoryMapOutput<Text, Text>> mapOutputs2 = + new ArrayList<InMemoryMapOutput<Text, Text>>(); + mapOutputs2.add(mapOutput3); + mapOutputs2.add(mapOutput4); + + inMemoryMerger2.merge(mapOutputs2); + + Assert.assertEquals(2, mergeManager.onDiskMapOutputs.size()); + + List<CompressAwarePath> paths = new ArrayList<CompressAwarePath>(); + Iterator<CompressAwarePath> iterator = mergeManager.onDiskMapOutputs.iterator(); List<String> keys = new ArrayList<String>(); List<String> values = new ArrayList<String>(); - readOnDiskMapOutput(conf, fs, outPath, keys, values); - Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot")); - Assert.assertEquals(values, Arrays.asList("disgusting", "pretty good", "delicious")); + while (iterator.hasNext()) { + CompressAwarePath next = iterator.next(); + readOnDiskMapOutput(conf, fs, next, keys, values); + paths.add(next); + } + Assert.assertEquals(keys, Arrays.asList("apple", "banana", "carrot", "apple", "banana", "carrot")); + Assert.assertEquals(values, Arrays.asList("awesome", "bla", "amazing", "disgusting", "pretty good", "delicious")); + mergeManager.close(); + + mergeManager = new MergeManagerImpl<Text, Text>( + reduceId2, jobConf, fs, lda, Reporter.NULL, null, null, null, null, null, + null, null, new Progress(), new MROutputFiles()); + + MergeThread<CompressAwarePath,Text,Text> onDiskMerger = mergeManager.createOnDiskMerger(); + onDiskMerger.merge(paths); + + Assert.assertEquals(1, mergeManager.onDiskMapOutputs.size()); + + keys = new ArrayList<String>(); + values = new ArrayList<String>(); + readOnDiskMapOutput(conf, fs, mergeManager.onDiskMapOutputs.iterator().next(), keys, values); + Assert.assertEquals(keys, Arrays.asList("apple", "apple", "banana", "banana", "carrot", "carrot")); + Assert.assertEquals(values, Arrays.asList("awesome", "disgusting", "pretty good", "bla", "amazing", "delicious")); mergeManager.close(); Assert.assertEquals(0, mergeManager.inMemoryMapOutputs.size()); Assert.assertEquals(0, mergeManager.inMemoryMergedMapOutputs.size()); Assert.assertEquals(0, mergeManager.onDiskMapOutputs.size()); } - + private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -152,11 +224,13 @@ public class TestMerger { writer.close(); return baos.toByteArray(); } - + private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path, List<String> keys, List<String> values) throws IOException { - IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, fs, - path, null, null); + FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path)); + + IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in, + fs.getFileStatus(path).getLen(), null, null); DataInputBuffer keyBuff = new DataInputBuffer(); DataInputBuffer valueBuff = new DataInputBuffer(); Text key = new Text(); @@ -169,17 +243,17 @@ public class TestMerger { values.add(value.toString()); } } - + @Test public void testCompressed() throws IOException { testMergeShouldReturnProperProgress(getCompressedSegments()); - } - +} + @Test public void testUncompressed() throws IOException { testMergeShouldReturnProperProgress(getUncompressedSegments()); } - + @SuppressWarnings( { "deprecation", "unchecked" }) public void testMergeShouldReturnProperProgress( List<Segment<Text, Text>> segments) throws IOException { @@ -212,7 +286,7 @@ public class TestMerger { } return segments; } - + private List<Segment<Text, Text>> getCompressedSegments() throws IOException { List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>(); for (int i = 1; i < 1; i++) { @@ -220,7 +294,7 @@ public class TestMerger { } return segments; } - + private Segment<Text, Text> getUncompressedSegment(int i) throws IOException { return new Segment<Text, Text>(getReader(i), false); } @@ -258,7 +332,7 @@ public class TestMerger { } }; } - + private Answer<?> getValueAnswer(final String segmentName) { return new Answer<Void>() { int i = 0; Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java Wed Aug 20 18:39:03 2014 @@ -18,6 +18,8 @@ package org.apache.hadoop.mapred; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -42,7 +44,7 @@ public class TestIFile { DefaultCodec codec = new GzipCodec(); codec.setConf(conf); IFile.Writer<Text, Text> writer = - new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class, + new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class, codec, null); writer.close(); } @@ -56,12 +58,15 @@ public class TestIFile { Path path = new Path(new Path("build/test.ifile"), "data"); DefaultCodec codec = new GzipCodec(); codec.setConf(conf); + FSDataOutputStream out = rfs.create(path); IFile.Writer<Text, Text> writer = - new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class, + new IFile.Writer<Text, Text>(conf, out, Text.class, Text.class, codec, null); writer.close(); + FSDataInputStream in = rfs.open(path); IFile.Reader<Text, Text> reader = - new IFile.Reader<Text, Text>(conf, rfs, path, codec, null); + new IFile.Reader<Text, Text>(conf, in, rfs.getFileStatus(path).getLen(), + codec, null); reader.close(); // test check sum Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReduceTask.java Wed Aug 20 18:39:03 2014 @@ -80,7 +80,7 @@ public class TestReduceTask extends Test FileSystem rfs = ((LocalFileSystem)localFs).getRaw(); Path path = new Path(tmpDir, "data.in"); IFile.Writer<Text, Text> writer = - new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class, + new IFile.Writer<Text, Text>(conf, rfs.create(path), Text.class, Text.class, codec, null); for(Pair p: vals) { writer.append(new Text(p.key), new Text(p.value)); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java?rev=1619197&r1=1619196&r2=1619197&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java Wed Aug 20 18:39:03 2014 @@ -95,9 +95,9 @@ public class TestPipeApplication { new Counters.Counter(), new Progress()); FileSystem fs = new RawLocalFileSystem(); fs.setConf(conf); - Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs, - new Path(workSpace + File.separator + "outfile"), IntWritable.class, - Text.class, null, null); + Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs.create( + new Path(workSpace + File.separator + "outfile")), IntWritable.class, + Text.class, null, null, true); output.setWriter(wr); // stub for client File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeApplicationRunnableStub"); @@ -177,9 +177,9 @@ public class TestPipeApplication { new Progress()); FileSystem fs = new RawLocalFileSystem(); fs.setConf(conf); - Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs, - new Path(workSpace.getAbsolutePath() + File.separator + "outfile"), - IntWritable.class, Text.class, null, null); + Writer<IntWritable, Text> wr = new Writer<IntWritable, Text>(conf, fs.create( + new Path(workSpace.getAbsolutePath() + File.separator + "outfile")), + IntWritable.class, Text.class, null, null, true); output.setWriter(wr); conf.set(Submitter.PRESERVE_COMMANDFILE, "true");