Author: bobby Date: Fri Mar 15 21:11:45 2013 New Revision: 1457121 URL: http://svn.apache.org/r1457121 Log: svn merge -c 1457119 FIXES: MAPREDUCE-5042. Reducer unable to fetch for a map task that was recovered (Jason Lowe via bobby)
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Mar 15 21:11:45 2013 @@ -641,6 +641,9 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-5060. Fetch failures that time out only count against the first map task (Robert Joseph Evans via jlowe) + MAPREDUCE-5042. Reducer unable to fetch for a map task that was recovered + (Jason Lowe via bobby) + Release 0.23.6 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java Fri Mar 15 21:11:45 2013 @@ -269,9 +269,17 @@ class YarnChild { job.setBoolean("ipc.client.tcpnodelay", true); job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, YarnOutputFiles.class, MapOutputFile.class); - // set the jobTokenFile into task + // set the jobToken and shuffle secrets into task task.setJobTokenSecret( JobTokenSecretManager.createSecretKey(jt.getPassword())); + byte[] shuffleSecret = TokenCache.getShuffleSecretKey(credentials); + if (shuffleSecret == null) { + LOG.warn("Shuffle secret missing from task credentials." + + " Using job token secret as shuffle secret."); + shuffleSecret = jt.getPassword(); + } + task.setShuffleSecret( + JobTokenSecretManager.createSecretKey(shuffleSecret)); // setup the child's MRConfig.LOCAL_DIR. configureLocalDirs(task, job); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Fri Mar 15 21:11:45 2013 @@ -55,6 +55,7 @@ import org.apache.hadoop.mapreduce.jobhi import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; @@ -339,8 +340,15 @@ public class MRAppMaster extends Composi boolean recoveryEnabled = conf.getBoolean( MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); boolean recoverySupportedByCommitter = committer.isRecoverySupported(); + + // If a shuffle secret was not provided by the job client then this app + // attempt will generate one. However that disables recovery if there + // are reducers as the shuffle secret would be app attempt specific. + boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 && + TokenCache.getShuffleSecretKey(fsTokens) != null); + if (recoveryEnabled && recoverySupportedByCommitter - && appAttemptID.getAttemptId() > 1) { + && shuffleKeyValidForRecovery && appAttemptID.getAttemptId() > 1) { LOG.info("Recovery is enabled. " + "Will try to recover from previous life on best effort basis."); recoveryServ = createRecoveryService(context); @@ -351,7 +359,8 @@ public class MRAppMaster extends Composi } else { LOG.info("Not starting RecoveryService: recoveryEnabled: " + recoveryEnabled + " recoverySupportedByCommitter: " - + recoverySupportedByCommitter + " ApplicationAttemptID: " + + recoverySupportedByCommitter + " shuffleKeyValidForRecovery: " + + shuffleKeyValidForRecovery + " ApplicationAttemptID: " + appAttemptID.getAttemptId()); dispatcher = createDispatcher(); addIfService(dispatcher); @@ -471,7 +480,11 @@ public class MRAppMaster extends Composi protected FileSystem getFileSystem(Configuration conf) throws IOException { return FileSystem.get(conf); } - + + protected Credentials getCredentials() { + return fsTokens; + } + /** * clean up staging directories for the job. * @throws IOException Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Mar 15 21:11:45 2013 @@ -1350,13 +1350,13 @@ public class JobImpl implements org.apac LOG.info("Adding job token for " + oldJobIDString + " to jobTokenSecretManager"); - // Upload the jobTokens onto the remote FS so that ContainerManager can - // localize it to be used by the Containers(tasks) - Credentials tokenStorage = new Credentials(); - TokenCache.setJobToken(job.jobToken, tokenStorage); - - if (UserGroupInformation.isSecurityEnabled()) { - tokenStorage.addAll(job.fsTokens); + // If the job client did not setup the shuffle secret then reuse + // the job token secret for the shuffle. + if (TokenCache.getShuffleSecretKey(job.fsTokens) == null) { + LOG.warn("Shuffle secret key missing from job credentials." + + " Using job token secret as shuffle secret."); + TokenCache.setShuffleSecretKey(job.jobToken.getPassword(), + job.fsTokens); } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Mar 15 21:11:45 2013 @@ -702,10 +702,21 @@ public abstract class TaskAttemptImpl im ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength()); - // Add shuffle token + // Add shuffle secret key + // The secret key is converted to a JobToken to preserve backwards + // compatibility with an older ShuffleHandler running on an NM. LOG.info("Putting shuffle token in serviceData"); + byte[] shuffleSecret = TokenCache.getShuffleSecretKey(credentials); + if (shuffleSecret == null) { + LOG.warn("Cannot locate shuffle secret in credentials." + + " Using job token as shuffle secret."); + shuffleSecret = jobToken.getPassword(); + } + Token<JobTokenIdentifier> shuffleToken = new Token<JobTokenIdentifier>( + jobToken.getIdentifier(), shuffleSecret, jobToken.getKind(), + jobToken.getService()); serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, - ShuffleHandler.serializeServiceData(jobToken)); + ShuffleHandler.serializeServiceData(shuffleToken)); Apps.addToEnvironment( environment, Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Fri Mar 15 21:11:45 2013 @@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.TaskA import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent; +import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -144,6 +145,9 @@ public class MRApp extends MRAppMaster { @Override protected void downloadTokensAndSetupUGI(Configuration conf) { + // Fake a shuffle secret that normally is provided by the job client. + String shuffleSecret = "fake-shuffle-secret"; + TokenCache.setShuffleSecretKey(shuffleSecret.getBytes(), getCredentials()); } private static ApplicationAttemptId getApplicationAttemptId( Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Fri Mar 15 21:11:45 2013 @@ -900,6 +900,117 @@ public class TestRecovery { } + @Test(timeout=30000) + public void testRecoveryWithoutShuffleSecret() throws Exception { + + int runCount = 0; + MRApp app = new MRAppNoShuffleSecret(2, 1, false, + this.getClass().getName(), true, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + Iterator<Task> it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task mapTask2 = it.next(); + Task reduceTask = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + + TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator().next(); + TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next(); + + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task1Attempt, TaskAttemptState.RUNNING); + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + + // reduces must be in NEW state + Assert.assertEquals("Reduce Task state not correct", + TaskState.RUNNING, reduceTask.getReport().getTaskState()); + + //send the done signal to the 1st map attempt + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait for first map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + //stop the app + app.stop(); + + //in recovery the 1st map should NOT be recovered from previous run + //since the shuffle secret was not provided with the job credentials + //and had to be rolled per app attempt + app = new MRAppNoShuffleSecret(2, 1, false, + this.getClass().getName(), false, ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + reduceTask = it.next(); + + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + + task2Attempt = mapTask2.getAttempts().values().iterator().next(); + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + + //send the done signal to the 2nd map task + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapTask2.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + //wait to get it completed + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + //verify first map task is still running + app.waitForState(mapTask1, TaskState.RUNNING); + + //send the done signal to the 2nd map task + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapTask1.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + //wait to get it completed + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + //wait for reduce to be running before sending done + app.waitForState(reduceTask, TaskState.RUNNING); + //send the done signal to the reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + reduceTask.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + } + private void writeBadOutput(TaskAttempt attempt, Configuration conf) throws Exception { TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, @@ -1019,6 +1130,18 @@ public class TestRecovery { } } + static class MRAppNoShuffleSecret extends MRAppWithHistory { + public MRAppNoShuffleSecret(int maps, int reduces, boolean autoComplete, + String testName, boolean cleanOnStart, int startCount) { + super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); + } + + @Override + protected void downloadTokensAndSetupUGI(Configuration conf) { + // do NOT put a shuffle secret in the job credentials + } + } + public static void main(String[] arg) throws Exception { TestRecovery test = new TestRecovery(); test.testCrashed(); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Mar 15 21:11:45 2013 @@ -491,7 +491,7 @@ public class TestJobImpl { MRAppMetrics mrAppMetrics = MRAppMetrics.create(); JobImpl job = new JobImpl(jobId, Records .newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class), - null, mock(JobTokenSecretManager.class), null, null, null, + null, new JobTokenSecretManager(), new Credentials(), null, null, mrAppMetrics, true, null, 0, null, null, null, null); InitTransition initTransition = getInitTransition(2); JobEvent mockJobEvent = mock(JobEvent.class); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Fri Mar 15 21:11:45 2013 @@ -185,6 +185,7 @@ abstract public class Task implements Wr private int numSlotsRequired; protected TaskUmbilicalProtocol umbilical; protected SecretKey tokenSecret; + protected SecretKey shuffleSecret; protected GcTimeUpdater gcUpdater; //////////////////////////////////////////// @@ -261,7 +262,22 @@ abstract public class Task implements Wr return this.tokenSecret; } - + /** + * Set the secret key used to authenticate the shuffle + * @param shuffleSecret the secret + */ + public void setShuffleSecret(SecretKey shuffleSecret) { + this.shuffleSecret = shuffleSecret; + } + + /** + * Get the secret key used to authenticate the shuffle + * @return the shuffle secret + */ + public SecretKey getShuffleSecret() { + return this.shuffleSecret; + } + /** * Get the index of this task within the job. * @return the integer part of the task id Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Mar 15 21:11:45 2013 @@ -23,11 +23,15 @@ import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; +import javax.crypto.KeyGenerator; +import javax.crypto.SecretKey; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -62,6 +66,8 @@ import com.google.common.base.Charsets; @InterfaceStability.Unstable class JobSubmitter { protected static final Log LOG = LogFactory.getLog(JobSubmitter.class); + private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1"; + private static final int SHUFFLE_KEY_LENGTH = 64; private FileSystem jtFs; private ClientProtocol submitClient; private String submitHostName; @@ -359,6 +365,20 @@ class JobSubmitter { populateTokenCache(conf, job.getCredentials()); + // generate a secret to authenticate shuffle transfers + if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { + KeyGenerator keyGen; + try { + keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); + keyGen.init(SHUFFLE_KEY_LENGTH); + } catch (NoSuchAlgorithmException e) { + throw new IOException("Error generating shuffle secret key", e); + } + SecretKey shuffleKey = keyGen.generateKey(); + TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), + job.getCredentials()); + } + copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java Fri Mar 15 21:11:45 2013 @@ -154,7 +154,8 @@ public class TokenCache { */ @InterfaceAudience.Private public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile"; - private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken"); + private static final Text JOB_TOKEN = new Text("JobToken"); + private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken"); /** * load job token from a file @@ -194,4 +195,14 @@ public class TokenCache { public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) { return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN); } + + @InterfaceAudience.Private + public static void setShuffleSecretKey(byte[] key, Credentials credentials) { + credentials.addSecretKey(SHUFFLE_TOKEN, key); + } + + @InterfaceAudience.Private + public static byte[] getShuffleSecretKey(Credentials credentials) { + return getSecretKey(credentials, SHUFFLE_TOKEN); + } } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Fri Mar 15 21:11:45 2013 @@ -82,7 +82,7 @@ class Fetcher<K,V> extends Thread { private final int connectionTimeout; private final int readTimeout; - private final SecretKey jobTokenSecret; + private final SecretKey shuffleSecretKey; private volatile boolean stopped = false; @@ -92,7 +92,7 @@ class Fetcher<K,V> extends Thread { public Fetcher(JobConf job, TaskAttemptID reduceId, ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger, Reporter reporter, ShuffleClientMetrics metrics, - ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) { + ExceptionReporter exceptionReporter, SecretKey shuffleKey) { this.reporter = reporter; this.scheduler = scheduler; this.merger = merger; @@ -100,7 +100,7 @@ class Fetcher<K,V> extends Thread { this.exceptionReporter = exceptionReporter; this.id = ++nextId; this.reduce = reduceId.getTaskID().getId(); - this.jobTokenSecret = jobTokenSecret; + this.shuffleSecretKey = shuffleKey; ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.IO_ERROR.toString()); wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, @@ -228,7 +228,8 @@ class Fetcher<K,V> extends Thread { // generate hash of the url String msgToEncode = SecureShuffleUtils.buildMsgFrom(url); - String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret); + String encHash = SecureShuffleUtils.hashFromString(msgToEncode, + shuffleSecretKey); // put url hash into http header connection.addRequestProperty( @@ -253,7 +254,7 @@ class Fetcher<K,V> extends Thread { } LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash); // verify that replyHash is HMac of encHash - SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret); + SecureShuffleUtils.verifyReply(replyHash, encHash, shuffleSecretKey); LOG.info("for url="+msgToEncode+" sent hash and received reply"); } catch (IOException ie) { boolean connectExcpt = ie instanceof ConnectException; Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Fri Mar 15 21:11:45 2013 @@ -108,7 +108,7 @@ public class Shuffle<K, V> implements Sh for (int i=0; i < numFetchers; ++i) { fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger, reporter, metrics, this, - reduceTask.getJobTokenSecret()); + reduceTask.getShuffleSecret()); fetchers[i].start(); } Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java?rev=1457121&r1=1457120&r2=1457121&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java Fri Mar 15 21:11:45 2013 @@ -47,6 +47,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.Counters.Group; @@ -106,7 +107,7 @@ public class TestPipeApplication { Token<ApplicationTokenIdentifier> token = new Token<ApplicationTokenIdentifier>( "user".getBytes(), "password".getBytes(), new Text("kind"), new Text( "service")); - conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token); + TokenCache.setJobToken(token, conf.getCredentials()); conf.setBoolean(MRJobConfig.SKIP_RECORDS, true); TestTaskReporter reporter = new TestTaskReporter(); PipesMapRunner<FloatWritable, NullWritable, IntWritable, Text> runner = new PipesMapRunner<FloatWritable, NullWritable, IntWritable, Text>(); @@ -171,7 +172,7 @@ public class TestPipeApplication { "user".getBytes(), "password".getBytes(), new Text("kind"), new Text( "service")); - conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token); + TokenCache.setJobToken(token, conf.getCredentials()); FakeCollector output = new FakeCollector(new Counters.Counter(), new Progress()); FileSystem fs = new RawLocalFileSystem(); @@ -391,7 +392,7 @@ public class TestPipeApplication { Token<ApplicationTokenIdentifier> token = new Token<ApplicationTokenIdentifier>( "user".getBytes(), "password".getBytes(), new Text("kind"), new Text( "service")); - conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token); + TokenCache.setJobToken(token, conf.getCredentials()); File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeReducerStub"); conf.set(MRJobConfig.CACHE_LOCALFILES, fCommand.getAbsolutePath());