Repository: hadoop Updated Branches: refs/heads/branch-2.7 516175143 -> 3c51654d5
Fixing MR intermediate spills. Contributed by Arun Suresh. (cherry picked from commit 6b710a42e00acca405e085724c89cda016cf7442) (cherry picked from commit 87862970f15e980eaf0b25e3eaf507becf349ae5) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d9d7bbd9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d9d7bbd9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d9d7bbd9 Branch: refs/heads/branch-2.7 Commit: d9d7bbd99b533da5ca570deb3b8dc8a959c6b4db Parents: 5161751 Author: Vinod Kumar Vavilapalli <[email protected]> Authored: Thu May 14 16:07:56 2015 -0700 Committer: Vinod Kumar Vavilapalli <[email protected]> Committed: Thu May 14 17:00:56 2015 -0700 ---------------------------------------------------------------------- .../hadoop/mapred/LocalContainerLauncher.java | 10 +++++++ .../hadoop/mapred/TaskAttemptListenerImpl.java | 13 ++++++--- .../org/apache/hadoop/mapred/YarnChild.java | 18 ++++++++++++ .../hadoop/mapreduce/v2/app/MRAppMaster.java | 24 +++++++++++++++- .../mapred/TestTaskAttemptListenerImpl.java | 4 +-- .../hadoop/mapreduce/v2/app/TestFail.java | 2 +- .../java/org/apache/hadoop/mapred/Task.java | 25 ++++++++++++++++ .../apache/hadoop/mapreduce/CryptoUtils.java | 17 ++++++----- .../apache/hadoop/mapreduce/JobSubmitter.java | 15 ++++------ .../hadoop/mapreduce/security/TokenCache.java | 10 +++++++ .../mapreduce/task/reduce/LocalFetcher.java | 6 ++-- .../src/site/markdown/EncryptedShuffle.md | 8 ++++++ .../mapreduce/task/reduce/TestMerger.java | 2 +- .../TestMRIntermediateDataEncryption.java | 30 ++++++++++++++------ .../apache/hadoop/mapred/TestMapProgress.java | 14 +++++---- 15 files changed, 154 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java index 218ac83..b30a695 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java @@ -82,6 +82,7 @@ public class LocalContainerLauncher extends AbstractService implements private final TaskUmbilicalProtocol umbilical; private ExecutorService taskRunner; private Thread eventHandler; + private byte[] encryptedSpillKey = new byte[] {0}; private BlockingQueue<ContainerLauncherEvent> eventQueue = new LinkedBlockingQueue<ContainerLauncherEvent>(); @@ -156,6 +157,11 @@ public class LocalContainerLauncher extends AbstractService implements } } + public void setEncryptedSpillKey(byte[] encryptedSpillKey) { + if (encryptedSpillKey != null) { + this.encryptedSpillKey = encryptedSpillKey; + } + } /* * Uber-AM lifecycle/ordering ("normal" case): @@ -354,6 +360,10 @@ public class LocalContainerLauncher extends AbstractService implements // map to handle) conf.setBoolean("mapreduce.task.uberized", true); + // Check and handle Encrypted spill key + task.setEncryptedSpillKey(encryptedSpillKey); + YarnChild.setEncryptedSpillKeyIfRequired(task); + // META-FIXME: do we want the extra sanity-checking (doneWithMaps, // etc.), or just assume/hope the state machine(s) and uber-AM work // as expected? http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index c6b90bc..6627604 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -81,17 +81,21 @@ public class TaskAttemptListenerImpl extends CompositeService jvmIDToActiveAttemptMap = new ConcurrentHashMap<WrappedJvmID, org.apache.hadoop.mapred.Task>(); private Set<WrappedJvmID> launchedJVMs = Collections - .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); - + .newSetFromMap(new ConcurrentHashMap<WrappedJvmID, Boolean>()); + private JobTokenSecretManager jobTokenSecretManager = null; - + + private byte[] encryptedSpillKey; + public TaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager, - RMHeartbeatHandler rmHeartbeatHandler) { + RMHeartbeatHandler rmHeartbeatHandler, + byte[] secretShuffleKey) { super(TaskAttemptListenerImpl.class.getName()); this.context = context; this.jobTokenSecretManager = jobTokenSecretManager; this.rmHeartbeatHandler = rmHeartbeatHandler; + this.encryptedSpillKey = secretShuffleKey; } @Override @@ -439,6 +443,7 @@ public class TaskAttemptListenerImpl extends CompositeService jvmIDToActiveAttemptMap.remove(wJvmID); launchedJVMs.remove(wJvmID); LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID()); + task.setEncryptedSpillKey(encryptedSpillKey); jvmTask = new JvmTask(task, false); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java index fec13a8..7f3111f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java @@ -159,6 +159,7 @@ class YarnChild { @Override public Object run() throws Exception { // use job-specified working directory + setEncryptedSpillKeyIfRequired(taskFinal); FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory()); taskFinal.run(job, umbilical); // run the task return null; @@ -218,6 +219,23 @@ class YarnChild { } /** + * Utility method to check if the Encrypted Spill Key needs to be set into the + * user credentials of the user running the Map / Reduce Task + * @param task The Map / Reduce task to set the Encrypted Spill information in + * @throws Exception + */ + public static void setEncryptedSpillKeyIfRequired(Task task) throws + Exception { + if ((task != null) && (task.getEncryptedSpillKey() != null) && (task + .getEncryptedSpillKey().length > 1)) { + Credentials creds = + UserGroupInformation.getCurrentUser().getCredentials(); + TokenCache.setEncryptedSpillKey(task.getEncryptedSpillKey(), creds); + UserGroupInformation.getCurrentUser().addCredentials(creds); + } + } + + /** * Configure mapred-local dirs. This config is used by the task for finding * out an output directory. * @throws IOException http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 835ad09..26caa2d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.security.NoSuchAlgorithmException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.HashMap; @@ -47,6 +48,7 @@ import org.apache.hadoop.mapred.LocalContainerLauncher; import org.apache.hadoop.mapred.TaskAttemptListenerImpl; import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; +import org.apache.hadoop.mapreduce.CryptoUtils; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; @@ -145,6 +147,8 @@ import org.apache.log4j.LogManager; import com.google.common.annotations.VisibleForTesting; +import javax.crypto.KeyGenerator; + /** * The Map-Reduce Application Master. * The state machine is encapsulated in the implementation of Job interface. @@ -172,6 +176,7 @@ public class MRAppMaster extends CompositeService { * Priority of the MRAppMaster shutdown hook. */ public static final int SHUTDOWN_HOOK_PRIORITY = 30; + public static final String INTERMEDIATE_DATA_ENCRYPTION_ALGO = "HmacSHA1"; private Clock clock; private final long startTime; @@ -202,6 +207,7 @@ public class MRAppMaster extends CompositeService { private JobEventDispatcher jobEventDispatcher; private JobHistoryEventHandler jobHistoryEventHandler; private SpeculatorEventDispatcher speculatorEventDispatcher; + private byte[] encryptedSpillKey; private Job job; private Credentials jobCredentials = new Credentials(); // Filled during init @@ -646,8 +652,22 @@ public class MRAppMaster extends CompositeService { try { this.currentUser = UserGroupInformation.getCurrentUser(); this.jobCredentials = ((JobConf)conf).getCredentials(); + if (CryptoUtils.isEncryptedSpillEnabled(conf)) { + int keyLen = conf.getInt( + MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, + MRJobConfig + .DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS); + KeyGenerator keyGen = + KeyGenerator.getInstance(INTERMEDIATE_DATA_ENCRYPTION_ALGO); + keyGen.init(keyLen); + encryptedSpillKey = keyGen.generateKey().getEncoded(); + } else { + encryptedSpillKey = new byte[] {0}; + } } catch (IOException e) { throw new YarnRuntimeException(e); + } catch (NoSuchAlgorithmException e) { + throw new YarnRuntimeException(e); } } @@ -703,7 +723,7 @@ public class MRAppMaster extends CompositeService { protected TaskAttemptListener createTaskAttemptListener(AppContext context) { TaskAttemptListener lis = new TaskAttemptListenerImpl(context, jobTokenSecretManager, - getRMHeartbeatHandler()); + getRMHeartbeatHandler(), encryptedSpillKey); return lis; } @@ -870,6 +890,8 @@ public class MRAppMaster extends CompositeService { if (job.isUber()) { this.containerLauncher = new LocalContainerLauncher(context, (TaskUmbilicalProtocol) taskAttemptListener); + ((LocalContainerLauncher) this.containerLauncher) + .setEncryptedSpillKey(encryptedSpillKey); } else { this.containerLauncher = new ContainerLauncherImpl(context); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java index b851def..d35d1e9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java @@ -59,14 +59,14 @@ public class TestTaskAttemptListenerImpl { public MockTaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager, RMHeartbeatHandler rmHeartbeatHandler) { - super(context, jobTokenSecretManager, rmHeartbeatHandler); + super(context, jobTokenSecretManager, rmHeartbeatHandler, null); } public MockTaskAttemptListenerImpl(AppContext context, JobTokenSecretManager jobTokenSecretManager, RMHeartbeatHandler rmHeartbeatHandler, TaskHeartbeatHandler hbHandler) { - super(context, jobTokenSecretManager, rmHeartbeatHandler); + super(context, jobTokenSecretManager, rmHeartbeatHandler, null); this.taskHeartbeatHandler = hbHandler; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index 3b845f9..b7afbbb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -253,7 +253,7 @@ public class TestFail { //task time out is reduced //when attempt times out, heartbeat handler will send the lost event //leading to Attempt failure - return new TaskAttemptListenerImpl(getContext(), null, null) { + return new TaskAttemptListenerImpl(getContext(), null, null, null) { @Override public void startRpcServer(){}; @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 7bd9b31..8cb0402 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -148,6 +148,8 @@ abstract public class Task implements Writable, Configurable { private String user; // user running the job private TaskAttemptID taskId; // unique, includes job id private int partition; // id within job + private byte[] encryptedSpillKey = new byte[] {0}; // Key Used to encrypt + // intermediate spills TaskStatus taskStatus; // current status of the task protected JobStatus.State jobRunStateForCleanup; protected boolean jobCleanup = false; @@ -256,6 +258,24 @@ abstract public class Task implements Writable, Configurable { } /** + * Get Encrypted spill key + * @return encrypted spill key + */ + public byte[] getEncryptedSpillKey() { + return encryptedSpillKey; + } + + /** + * Set Encrypted spill key + * @param encryptedSpillKey key + */ + public void setEncryptedSpillKey(byte[] encryptedSpillKey) { + if (encryptedSpillKey != null) { + this.encryptedSpillKey = encryptedSpillKey; + } + } + + /** * Get the job token secret * @return the token secret */ @@ -485,6 +505,8 @@ abstract public class Task implements Writable, Configurable { out.writeBoolean(writeSkipRecs); out.writeBoolean(taskCleanup); Text.writeString(out, user); + out.writeInt(encryptedSpillKey.length); + out.write(encryptedSpillKey); extraData.write(out); } @@ -510,6 +532,9 @@ abstract public class Task implements Writable, Configurable { setPhase(TaskStatus.Phase.CLEANUP); } user = StringInterner.weakIntern(Text.readString(in)); + int len = in.readInt(); + encryptedSpillKey = new byte[len]; + in.readFully(encryptedSpillKey); extraData.readFields(in); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java index ef06176..c4130b1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java @@ -34,7 +34,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.crypto.CryptoFSDataInputStream; import org.apache.hadoop.fs.crypto.CryptoFSDataOutputStream; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.LimitInputStream; @@ -50,7 +49,7 @@ public class CryptoUtils { private static final Log LOG = LogFactory.getLog(CryptoUtils.class); - public static boolean isShuffleEncrypted(Configuration conf) { + public static boolean isEncryptedSpillEnabled(Configuration conf) { return conf.getBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA); } @@ -64,7 +63,7 @@ public class CryptoUtils { */ public static byte[] createIV(Configuration conf) throws IOException { CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf); - if (isShuffleEncrypted(conf)) { + if (isEncryptedSpillEnabled(conf)) { byte[] iv = new byte[cryptoCodec.getCipherSuite().getAlgorithmBlockSize()]; cryptoCodec.generateSecureRandom(iv); return iv; @@ -75,13 +74,13 @@ public class CryptoUtils { public static int cryptoPadding(Configuration conf) { // Sizeof(IV) + long(start-offset) - return isShuffleEncrypted(conf) ? CryptoCodec.getInstance(conf) + return isEncryptedSpillEnabled(conf) ? CryptoCodec.getInstance(conf) .getCipherSuite().getAlgorithmBlockSize() + 8 : 0; } private static byte[] getEncryptionKey() throws IOException { - return TokenCache.getShuffleSecretKey(UserGroupInformation.getCurrentUser() - .getCredentials()); + return TokenCache.getEncryptedSpillKey(UserGroupInformation.getCurrentUser() + .getCredentials()); } private static int getBufferSize(Configuration conf) { @@ -102,7 +101,7 @@ public class CryptoUtils { */ public static FSDataOutputStream wrapIfNecessary(Configuration conf, FSDataOutputStream out) throws IOException { - if (isShuffleEncrypted(conf)) { + if (isEncryptedSpillEnabled(conf)) { out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array()); byte[] iv = createIV(conf); out.write(iv); @@ -137,7 +136,7 @@ public class CryptoUtils { */ public static InputStream wrapIfNecessary(Configuration conf, InputStream in, long length) throws IOException { - if (isShuffleEncrypted(conf)) { + if (isEncryptedSpillEnabled(conf)) { int bufferSize = getBufferSize(conf); if (length > -1) { in = new LimitInputStream(in, length); @@ -174,7 +173,7 @@ public class CryptoUtils { */ public static FSDataInputStream wrapIfNecessary(Configuration conf, FSDataInputStream in) throws IOException { - if (isShuffleEncrypted(conf)) { + if (isEncryptedSpillEnabled(conf)) { CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf); int bufferSize = getBufferSize(conf); // Not going to be used... but still has to be read... http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index 31a3e2a..adfbf04 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -18,12 +18,10 @@ package org.apache.hadoop.mapreduce; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; -import java.net.UnknownHostException; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; @@ -42,7 +40,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Text; @@ -175,13 +172,8 @@ class JobSubmitter { if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { - - int keyLen = CryptoUtils.isShuffleEncrypted(conf) - ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, - MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS) - : SHUFFLE_KEY_LENGTH; keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); - keyGen.init(keyLen); + keyGen.init(SHUFFLE_KEY_LENGTH); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } @@ -189,6 +181,11 @@ class JobSubmitter { TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } + if (CryptoUtils.isEncryptedSpillEnabled(conf)) { + conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1); + LOG.warn("Max job attempts set to 1 since encrypted intermediate" + + "data spill is enabled"); + } copyAndConfigureFiles(job, submitJobDir); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java index 7b1f657..77b2d78 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java @@ -158,6 +158,7 @@ public class TokenCache { public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile"; private static final Text JOB_TOKEN = new Text("JobToken"); private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken"); + private static final Text ENC_SPILL_KEY = new Text("MapReduceEncryptedSpillKey"); /** * load job token from a file @@ -226,6 +227,15 @@ public class TokenCache { return getSecretKey(credentials, SHUFFLE_TOKEN); } + @InterfaceAudience.Private + public static void setEncryptedSpillKey(byte[] key, Credentials credentials) { + credentials.addSecretKey(ENC_SPILL_KEY, key); + } + + @InterfaceAudience.Private + public static byte[] getEncryptedSpillKey(Credentials credentials) { + return getSecretKey(credentials, ENC_SPILL_KEY); + } /** * @deprecated Use {@link Credentials#getToken(org.apache.hadoop.io.Text)} * instead, this method is included for compatibility against Hadoop-1 http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java index 6794c99..de2382c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java @@ -127,6 +127,9 @@ class LocalFetcher<K,V> extends Fetcher<K, V> { long compressedLength = ir.partLength; long decompressedLength = ir.rawLength; + compressedLength -= CryptoUtils.cryptoPadding(job); + decompressedLength -= CryptoUtils.cryptoPadding(job); + // Get the location for the map output - either in-memory or on-disk MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength, id); @@ -150,8 +153,7 @@ class LocalFetcher<K,V> extends Fetcher<K, V> { inStream = CryptoUtils.wrapIfNecessary(job, inStream); try { - inStream.seek(ir.startOffset); - + inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job)); mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); } finally { try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/EncryptedShuffle.md ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/EncryptedShuffle.md b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/EncryptedShuffle.md index 58fd52a..c23be7a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/EncryptedShuffle.md +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/EncryptedShuffle.md @@ -253,3 +253,11 @@ You can do this on a per-job basis, or by means of a cluster-wide setting in the To set this property in NodeManager, set it in the `yarn-env.sh` file: YARN_NODEMANAGER_OPTS="-Djavax.net.debug=all" + +Encrypted Intermediate Data Spill files +--------------------------------------- + +This capability allows encryption of the intermediate files generated during the merge and shuffle phases. +It can be enabled by setting the `mapreduce.job.encrypted-intermediate-data` job property to `true`. + +**NOTE:** Currently, enabling encrypted intermediate data spills would restrict the number of attempts of the job to 1. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java index 6e3bedf..a6b1964 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java @@ -87,7 +87,7 @@ public class TestMerger { jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); - TokenCache.setShuffleSecretKey(new byte[16], credentials); + TokenCache.setEncryptedSpillKey(new byte[16], credentials); UserGroupInformation.getCurrentUser().addCredentials(credentials); testInMemoryAndOnDiskMerger(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java index ebc32ad..28b2295 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRIntermediateDataEncryption.java @@ -52,24 +52,31 @@ public class TestMRIntermediateDataEncryption { @Test public void testSingleReducer() throws Exception { - doEncryptionTest(3, 1, 2); + doEncryptionTest(3, 1, 2, false); + } + + @Test + public void testUberMode() throws Exception { + doEncryptionTest(3, 1, 2, true); } @Test public void testMultipleMapsPerNode() throws Exception { - doEncryptionTest(8, 1, 2); + doEncryptionTest(8, 1, 2, false); } @Test public void testMultipleReducers() throws Exception { - doEncryptionTest(2, 4, 2); + doEncryptionTest(2, 4, 2, false); } - public void doEncryptionTest(int numMappers, int numReducers, int numNodes) throws Exception { - doEncryptionTest(numMappers, numReducers, numNodes, 1000); + public void doEncryptionTest(int numMappers, int numReducers, int numNodes, + boolean isUber) throws Exception { + doEncryptionTest(numMappers, numReducers, numNodes, 1000, isUber); } - public void doEncryptionTest(int numMappers, int numReducers, int numNodes, int numLines) throws Exception { + public void doEncryptionTest(int numMappers, int numReducers, int numNodes, + int numLines, boolean isUber) throws Exception { MiniDFSCluster dfsCluster = null; MiniMRClientCluster mrCluster = null; FileSystem fileSystem = null; @@ -85,7 +92,8 @@ public class TestMRIntermediateDataEncryption { // Generate input. createInput(fileSystem, numMappers, numLines); // Run the test. - runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem, numMappers, numReducers, numLines); + runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem, + numMappers, numReducers, numLines, isUber); } finally { if (dfsCluster != null) { dfsCluster.shutdown(); @@ -111,8 +119,9 @@ public class TestMRIntermediateDataEncryption { } } - private void runMergeTest(JobConf job, FileSystem fileSystem, int numMappers, int numReducers, int numLines) - throws Exception { + private void runMergeTest(JobConf job, FileSystem fileSystem, int + numMappers, int numReducers, int numLines, boolean isUber) + throws Exception { fileSystem.delete(OUTPUT, true); job.setJobName("Test"); JobClient client = new JobClient(job); @@ -133,6 +142,9 @@ public class TestMRIntermediateDataEncryption { job.setInt("mapreduce.map.maxattempts", 1); job.setInt("mapreduce.reduce.maxattempts", 1); job.setInt("mapred.test.num_lines", numLines); + if (isUber) { + job.setBoolean("mapreduce.job.ubertask.enable", true); + } job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); try { submittedJob = client.submitJob(job); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9d7bbd9/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java index bb4a2de..1fe549b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java @@ -118,12 +118,14 @@ public class TestMapProgress extends TestCase { throws IOException, InterruptedException { StringBuffer buf = new StringBuffer("Task "); buf.append(taskId); - buf.append(" making progress to "); - buf.append(taskStatus.getProgress()); - String state = taskStatus.getStateString(); - if (state != null) { - buf.append(" and state of "); - buf.append(state); + if (taskStatus != null) { + buf.append(" making progress to "); + buf.append(taskStatus.getProgress()); + String state = taskStatus.getStateString(); + if (state != null) { + buf.append(" and state of "); + buf.append(state); + } } LOG.info(buf.toString()); // ignore phase
