Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Aug 19 23:49:39 2014 @@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.event.Disp import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -535,7 +536,7 @@ public class TestJobImpl { // Verify access JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null, - null, null, null, true, null, 0, null, null, null, null); + null, null, null, true, user1, 0, null, null, null, null); Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -546,7 +547,7 @@ public class TestJobImpl { // Verify access JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null, - null, null, null, true, null, 0, null, null, null, null); + null, null, null, true, user1, 0, null, null, null, null); Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -557,7 +558,7 @@ public class TestJobImpl { // Verify access JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null, - null, null, null, true, null, 0, null, null, null, null); + null, null, null, true, user1, 0, null, null, null, null); Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -568,7 +569,7 @@ public class TestJobImpl { // Verify access JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null, - null, null, null, true, null, 0, null, null, null, null); + null, null, null, true, user1, 0, null, null, null, null); Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB)); Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB)); @@ -579,7 +580,7 @@ public class TestJobImpl { // Verify access JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null, - null, null, null, true, null, 0, null, null, null, null); + null, null, null, true, user1, 0, null, null, null, null); Assert.assertTrue(job5.checkAccess(ugi1, null)); Assert.assertTrue(job5.checkAccess(ugi2, null)); } @@ -656,6 +657,15 @@ public class TestJobImpl { conf.setInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 1); isUber = testUberDecision(conf); Assert.assertFalse(isUber); + + // enable uber mode of 0 reducer no matter how much memory assigned to reducer + conf = new Configuration(); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true); + conf.setInt(MRJobConfig.NUM_REDUCES, 0); + conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048); + conf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 10); + isUber = testUberDecision(conf); + Assert.assertTrue(isUber); } private boolean testUberDecision(Configuration conf) { @@ -728,6 +738,35 @@ public class TestJobImpl { commitHandler.stop(); } + static final String EXCEPTIONMSG = "Splits max exceeded"; + @Test + public void testMetaInfoSizeOverMax() throws Exception { + Configuration conf = new Configuration(); + JobID jobID = JobID.forName("job_1234567890000_0001"); + JobId jobId = TypeConverter.toYarn(jobID); + MRAppMetrics mrAppMetrics = MRAppMetrics.create(); + JobImpl job = + new JobImpl(jobId, ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0, 0), 0), conf, mock(EventHandler.class), + null, new JobTokenSecretManager(), new Credentials(), null, null, + mrAppMetrics, null, true, null, 0, null, null, null, null); + InitTransition initTransition = new InitTransition() { + @Override + protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) { + throw new YarnRuntimeException(EXCEPTIONMSG); + } + }; + JobEvent mockJobEvent = mock(JobEvent.class); + + JobStateInternal jobSI = initTransition.transition(job, mockJobEvent); + Assert.assertTrue("When init fails, return value from InitTransition.transition should equal NEW.", + jobSI.equals(JobStateInternal.NEW)); + Assert.assertTrue("Job diagnostics should contain YarnRuntimeException", + job.getDiagnostics().toString().contains("YarnRuntimeException")); + Assert.assertTrue("Job diagnostics should contain " + EXCEPTIONMSG, + job.getDiagnostics().toString().contains(EXCEPTIONMSG)); + } + private static CommitterEventHandler createCommitterEventHandler( Dispatcher dispatcher, OutputCommitter committer) { final SystemClock clock = new SystemClock();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java Tue Aug 19 23:49:39 2014 @@ -20,7 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a import java.util.Map; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Tue Aug 19 23:49:39 2014 @@ -33,7 +33,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -65,6 +65,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; @@ -795,6 +796,178 @@ public class TestTaskAttempt{ finishTime, Long.valueOf(taImpl.getFinishTime())); } + @Test + public void testContainerKillAfterAssigned() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, + jobFile, 1, splits, jobConf, taListener, new Token(), + new Credentials(), new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.2", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, + mock(Map.class))); + assertEquals("Task attempt is not in assinged state", + taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_KILL)); + assertEquals("Task should be in KILLED state", + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + taImpl.getInternalState()); + } + + @Test + public void testContainerKillWhileRunning() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, + jobFile, 1, splits, jobConf, taListener, new Token(), + new Credentials(), new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.2", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, + mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + assertEquals("Task attempt is not in running state", taImpl.getState(), + TaskAttemptState.RUNNING); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_KILL)); + assertFalse("InternalError occurred trying to handle TA_KILL", + eventHandler.internalError); + assertEquals("Task should be in KILLED state", + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + taImpl.getInternalState()); + } + + @Test + public void testContainerKillWhileCommitPending() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, + jobFile, 1, splits, jobConf, taListener, new Token(), + new Credentials(), new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.2", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, + mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + assertEquals("Task attempt is not in running state", taImpl.getState(), + TaskAttemptState.RUNNING); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_COMMIT_PENDING)); + assertEquals("Task should be in COMMIT_PENDING state", + TaskAttemptStateInternal.COMMIT_PENDING, taImpl.getInternalState()); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_KILL)); + assertFalse("InternalError occurred trying to handle TA_KILL", + eventHandler.internalError); + assertEquals("Task should be in KILLED state", + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + taImpl.getInternalState()); + } + public static class MockEventHandler implements EventHandler { public boolean internalError; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java Tue Aug 19 23:49:39 2014 @@ -27,7 +27,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileStatus; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java Tue Aug 19 23:49:39 2014 @@ -30,7 +30,7 @@ import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -439,4 +439,4 @@ public class TestContainerLauncher { throw new IOException(e); } } -} \ No newline at end of file +} Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Tue Aug 19 23:49:39 2014 @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; @@ -402,7 +403,7 @@ public class TestContainerLauncherImpl { 1234), "password".getBytes(), new ContainerTokenIdentifier( contId, containerManagerAddr, "user", Resource.newInstance(1024, 1), - currentTime + 10000L, 123, currentTime)); + currentTime + 10000L, 123, currentTime, Priority.newInstance(0), 0)); } private static class ContainerManagerForTest implements ContainerManagementProtocol { Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java Tue Aug 19 23:49:39 2014 @@ -31,7 +31,7 @@ import java.util.Map.Entry; import javax.net.ssl.SSLException; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.commons.httpclient.HttpStatus; import org.apache.hadoop.conf.Configuration; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestBlocks.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestBlocks.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestBlocks.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestBlocks.java Tue Aug 19 23:49:39 2014 @@ -106,6 +106,7 @@ public class TestBlocks { when(report.getTaskState()).thenReturn(TaskState.SUCCEEDED); when(report.getStartTime()).thenReturn(100001L); when(report.getFinishTime()).thenReturn(100011L); + when(report.getStatus()).thenReturn("Dummy Status \n*"); when(task.getReport()).thenReturn(report); @@ -134,6 +135,8 @@ public class TestBlocks { assertTrue(data.toString().contains("SUCCEEDED")); assertTrue(data.toString().contains("100001")); assertTrue(data.toString().contains("100011")); + assertFalse(data.toString().contains("Dummy Status \n*")); + assertTrue(data.toString().contains("Dummy Status \\n*")); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java Tue Aug 19 23:49:39 2014 @@ -168,10 +168,14 @@ public class FileNameIndexUtils { decodeJobHistoryFileName(jobDetails[QUEUE_NAME_INDEX])); try{ - indexInfo.setJobStartTime( - Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX]))); + if (jobDetails.length <= JOB_START_TIME_INDEX) { + indexInfo.setJobStartTime(indexInfo.getSubmitTime()); + } else { + indexInfo.setJobStartTime( + Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX]))); + } } catch (NumberFormatException e){ - LOG.warn("Unable to parse launch time from job history file " + LOG.warn("Unable to parse start time from job history file " + jhFileName + " : " + e); } } catch (IndexOutOfBoundsException e) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Tue Aug 19 23:49:39 2014 @@ -38,7 +38,9 @@ public class JHAdminConfig { public static final int DEFAULT_MR_HISTORY_PORT = 10020; public static final String DEFAULT_MR_HISTORY_ADDRESS = "0.0.0.0:" + DEFAULT_MR_HISTORY_PORT; - + public static final String MR_HISTORY_BIND_HOST = MR_HISTORY_PREFIX + + "bind-host"; + /** The address of the History server admin interface. */ public static final String JHS_ADMIN_ADDRESS = MR_HISTORY_PREFIX + "admin.address"; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Tue Aug 19 23:49:39 2014 @@ -22,20 +22,24 @@ import java.io.File; import java.io.IOException; import java.util.Calendar; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -117,6 +121,7 @@ public class JobHistoryUtils { public static final String TIMESTAMP_DIR_REGEX = "\\d{4}" + "\\" + Path.SEPARATOR + "\\d{2}" + "\\" + Path.SEPARATOR + "\\d{2}"; public static final Pattern TIMESTAMP_DIR_PATTERN = Pattern.compile(TIMESTAMP_DIR_REGEX); private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d"; + private static final Log LOG = LogFactory.getLog(JobHistoryUtils.class); private static final PathFilter CONF_FILTER = new PathFilter() { @Override @@ -183,7 +188,7 @@ public class JobHistoryUtils { Path stagingPath = MRApps.getStagingAreaDir(conf, user); Path path = new Path(stagingPath, jobId); String logDir = path.toString(); - return logDir; + return ensurePathInDefaultFileSystem(logDir, conf); } /** @@ -200,7 +205,7 @@ public class JobHistoryUtils { MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) + "/history/done_intermediate"; } - return doneDirPrefix; + return ensurePathInDefaultFileSystem(doneDirPrefix, conf); } /** @@ -216,7 +221,69 @@ public class JobHistoryUtils { MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) + "/history/done"; } - return doneDirPrefix; + return ensurePathInDefaultFileSystem(doneDirPrefix, conf); + } + + /** + * Get default file system URI for the cluster (used to ensure consistency + * of history done/staging locations) over different context + * + * @return Default file context + */ + private static FileContext getDefaultFileContext() { + // If FS_DEFAULT_NAME_KEY was set solely by core-default.xml then we ignore + // ignore it. This prevents defaulting history paths to file system specified + // by core-default.xml which would not make sense in any case. For a test + // case to exploit this functionality it should create core-site.xml + FileContext fc = null; + Configuration defaultConf = new Configuration(); + String[] sources; + sources = defaultConf.getPropertySources( + CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY); + if (sources != null && + (!Arrays.asList(sources).contains("core-default.xml") || + sources.length > 1)) { + try { + fc = FileContext.getFileContext(defaultConf); + LOG.info("Default file system [" + + fc.getDefaultFileSystem().getUri() + "]"); + } catch (UnsupportedFileSystemException e) { + LOG.error("Unable to create default file context [" + + defaultConf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) + + "]", + e); + } + } + else { + LOG.info("Default file system is set solely " + + "by core-default.xml therefore - ignoring"); + } + + return fc; + } + + /** + * Ensure that path belongs to cluster's default file system unless + * 1. it is already fully qualified. + * 2. current job configuration uses default file system + * 3. running from a test case without core-site.xml + * + * @param sourcePath source path + * @param conf the job configuration + * @return full qualified path (if necessary) in default file system + */ + private static String ensurePathInDefaultFileSystem(String sourcePath, Configuration conf) { + Path path = new Path(sourcePath); + FileContext fc = getDefaultFileContext(); + if (fc == null || + fc.getDefaultFileSystem().getUri().toString().equals( + conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "")) || + path.toUri().getAuthority() != null || + path.toUri().getScheme()!= null) { + return sourcePath; + } + + return fc.makeQualified(path).toString(); } /** Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Tue Aug 19 23:49:39 2014 @@ -33,6 +33,7 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -326,8 +327,8 @@ public class MRApps extends Apps { } /** - * Sets a {@link ApplicationClassLoader} on the given configuration and as - * the context classloader, if + * Creates and sets a {@link ApplicationClassLoader} on the given + * configuration and as the thread context classloader, if * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and * the APP_CLASSPATH environment variable is set. * @param conf @@ -335,25 +336,58 @@ public class MRApps extends Apps { */ public static void setJobClassLoader(Configuration conf) throws IOException { + setClassLoader(createJobClassLoader(conf), conf); + } + + /** + * Creates a {@link ApplicationClassLoader} if + * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and + * the APP_CLASSPATH environment variable is set. + * @param conf + * @returns the created job classloader, or null if the job classloader is not + * enabled or the APP_CLASSPATH environment variable is not set + * @throws IOException + */ + public static ClassLoader createJobClassLoader(Configuration conf) + throws IOException { + ClassLoader jobClassLoader = null; if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) { String appClasspath = System.getenv(Environment.APP_CLASSPATH.key()); if (appClasspath == null) { - LOG.warn("Not using job classloader since APP_CLASSPATH is not set."); + LOG.warn("Not creating job classloader since APP_CLASSPATH is not set."); } else { - LOG.info("Using job classloader"); + LOG.info("Creating job classloader"); if (LOG.isDebugEnabled()) { LOG.debug("APP_CLASSPATH=" + appClasspath); } - String[] systemClasses = conf.getStrings( - MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES); - ClassLoader jobClassLoader = createJobClassLoader(appClasspath, + String[] systemClasses = getSystemClasses(conf); + jobClassLoader = createJobClassLoader(appClasspath, systemClasses); - if (jobClassLoader != null) { - conf.setClassLoader(jobClassLoader); - Thread.currentThread().setContextClassLoader(jobClassLoader); - } } } + return jobClassLoader; + } + + /** + * Sets the provided classloader on the given configuration and as the thread + * context classloader if the classloader is not null. + * @param classLoader + * @param conf + */ + public static void setClassLoader(ClassLoader classLoader, + Configuration conf) { + if (classLoader != null) { + LOG.info("Setting classloader " + classLoader.getClass().getName() + + " on the configuration and as the thread context classloader"); + conf.setClassLoader(classLoader); + Thread.currentThread().setContextClassLoader(classLoader); + } + } + + @VisibleForTesting + static String[] getSystemClasses(Configuration conf) { + return conf.getTrimmedStrings( + MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES); } private static ClassLoader createJobClassLoader(final String appClasspath, Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRWebAppUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRWebAppUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRWebAppUtil.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRWebAppUtil.java Tue Aug 19 23:49:39 2014 @@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.v2.jo import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.ipc.RPCUtil; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -105,11 +106,15 @@ public class MRWebAppUtil { public static InetSocketAddress getJHSWebBindAddress(Configuration conf) { if (httpPolicyInJHS == Policy.HTTPS_ONLY) { - return conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS, + return conf.getSocketAddr( + JHAdminConfig.MR_HISTORY_BIND_HOST, + JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_PORT); } else { - return conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, + return conf.getSocketAddr( + JHAdminConfig.MR_HISTORY_BIND_HOST, + JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_PORT); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java Tue Aug 19 23:49:39 2014 @@ -23,16 +23,24 @@ import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.junit.After; import org.junit.Assert; import org.junit.Test; public class TestJobClient { - final static String TEST_DIR = new File(System.getProperty("test.build.data", - "/tmp")).getAbsolutePath(); + + final static String TEST_DIR = new File("target", + TestJobClient.class.getSimpleName()).getAbsolutePath(); + + @After + public void tearDown() { + FileUtil.fullyDelete(new File(TEST_DIR)); + } @Test public void testGetClusterStatusWithLocalJobRunner() throws Exception { @@ -51,11 +59,12 @@ public class TestJobClient { Assert.assertEquals(0, blackListedTrackersInfo.size()); } - @Test(timeout = 1000) + @Test(timeout = 10000) public void testIsJobDirValid() throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.getLocal(conf); Path testDir = new Path(TEST_DIR); + fs.mkdirs(testDir); Assert.assertFalse(JobClient.isJobDirValid(testDir, fs)); Path jobconf = new Path(testDir, "job.xml"); @@ -68,7 +77,7 @@ public class TestJobClient { fs.delete(jobsplit, true); } - @Test(timeout = 1000) + @Test(timeout = 10000) public void testGetStagingAreaDir() throws IOException, InterruptedException { Configuration conf = new Configuration(); JobClient client = new JobClient(conf); Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClientGetJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClientGetJob.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClientGetJob.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClientGetJob.java Tue Aug 19 23:49:39 2014 @@ -18,7 +18,7 @@ package org.apache.hadoop.mapred; -import static junit.framework.Assert.assertNotNull; +import static org.junit.Assert.assertNotNull; import java.io.IOException; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Tue Aug 19 23:49:39 2014 @@ -27,7 +27,7 @@ import java.util.Arrays; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; -import junit.framework.Assert; +import org.junit.Assert; import junit.framework.TestCase; import org.apache.commons.logging.Log; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java Tue Aug 19 23:49:39 2014 @@ -22,7 +22,7 @@ package org.apache.hadoop.mapreduce.v2; import java.io.IOException; import java.net.InetSocketAddress; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.Server; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java Tue Aug 19 23:49:39 2014 @@ -18,7 +18,7 @@ package org.apache.hadoop.mapreduce.v2; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java Tue Aug 19 23:49:39 2014 @@ -39,6 +39,17 @@ public class TestFileNameIndexUtils { + FileNameIndexUtils.DELIMITER + "%s" + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION; + private static final String OLD_FORMAT_BEFORE_ADD_START_TIME = "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION; + private static final String JOB_HISTORY_FILE_FORMATTER = "%s" + FileNameIndexUtils.DELIMITER + "%s" + FileNameIndexUtils.DELIMITER + "%s" @@ -236,6 +247,22 @@ public class TestFileNameIndexUtils { } @Test + public void testJobStartTimeBackwardsCompatible() throws IOException{ + String jobHistoryFile = String.format(OLD_FORMAT_BEFORE_ADD_START_TIME, + JOB_ID, + SUBMIT_TIME, + USER_NAME, + JOB_NAME_WITH_DELIMITER_ESCAPE, + FINISH_TIME, + NUM_MAPS, + NUM_REDUCES, + JOB_STATUS, + QUEUE_NAME ); + JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile); + Assert.assertEquals(info.getJobStartTime(), info.getSubmitTime()); + } + + @Test public void testJobHistoryFileNameBackwardsCompatible() throws IOException { JobID oldJobId = JobID.forName(JOB_ID); JobId jobId = TypeConverter.toYarn(oldJobId); Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Tue Aug 19 23:49:39 2014 @@ -33,6 +33,7 @@ import java.io.IOException; import java.net.URI; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.ApplicationClassLoader; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -400,7 +402,7 @@ public class TestMRApps { URI file = new URI("mockfs://mock/tmp/something.zip#something"); Path filePath = new Path(file); URI file2 = new URI("mockfs://mock/tmp/something.txt#something"); - Path file2Path = new Path(file); + Path file2Path = new Path(file2); when(mockFs.resolvePath(filePath)).thenReturn(filePath); when(mockFs.resolvePath(file2Path)).thenReturn(file2Path); @@ -492,4 +494,36 @@ public class TestMRApps { assertTrue(MRApps.TaskStateUI.COMPLETED.correspondsTo(TaskState.KILLED)); assertTrue(MRApps.TaskStateUI.RUNNING.correspondsTo(TaskState.RUNNING)); } + + + private static final String[] SYS_CLASSES = new String[] { + "/java/fake/Klass", + "/javax/fake/Klass", + "/org/apache/commons/logging/fake/Klass", + "/org/apache/log4j/fake/Klass", + "/org/apache/hadoop/fake/Klass" + }; + + private static final String[] DEFAULT_XMLS = new String[] { + "core-default.xml", + "mapred-default.xml", + "hdfs-default.xml", + "yarn-default.xml" + }; + + @Test + public void testSystemClasses() { + final List<String> systemClasses = + Arrays.asList(MRApps.getSystemClasses(new Configuration())); + for (String defaultXml : DEFAULT_XMLS) { + assertTrue(defaultXml + " must be system resource", + ApplicationClassLoader.isSystemClass(defaultXml, systemClasses)); + } + for (String klass : SYS_CLASSES) { + assertTrue(klass + " must be system class", + ApplicationClassLoader.isSystemClass(klass, systemClasses)); + } + assertFalse("/fake/Klass must not be a system class", + ApplicationClassLoader.isSystemClass("/fake/Klass", systemClasses)); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Tue Aug 19 23:49:39 2014 @@ -85,6 +85,16 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude> + <exclude>src/test/resources/testBOM.txt</exclude> + </excludes> + </configuration> + </plugin> </plugins> </build> </project> Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Tue Aug 19 23:49:39 2014 @@ -92,11 +92,11 @@ } }, {"name": "jobQueueName", "type": "string"}, - {"name": "workflowId", "type": "string"}, - {"name": "workflowName", "type": "string"}, - {"name": "workflowNodeName", "type": "string"}, - {"name": "workflowAdjacencies", "type": "string"}, - {"name": "workflowTags", "type": "string"} + {"name": "workflowId", "type": ["null","string"], "default": null}, + {"name": "workflowName", "type": ["null","string"], "default": null}, + {"name": "workflowNodeName", "type": ["null","string"], "default": null}, + {"name": "workflowAdjacencies", "type": ["null","string"], "default": null}, + {"name": "workflowTags", "type": ["null","string"], "default": null} ] }, @@ -136,7 +136,7 @@ {"name": "finishedMaps", "type": "int"}, {"name": "finishedReduces", "type": "int"}, {"name": "jobStatus", "type": "string"}, - {"name": "diagnostics", "type": "string"} + {"name": "diagnostics", "type": ["null","string"], "default": null} ] }, @@ -205,8 +205,8 @@ {"name": "httpPort", "type": "int"}, {"name": "shufflePort", "type": "int"}, {"name": "containerId", "type": "string"}, - {"name": "locality", "type": "string"}, - {"name": "avataar", "type": "string"} + {"name": "locality", "type": ["null","string"], "default": null}, + {"name": "avataar", "type": ["null","string"], "default": null} ] }, @@ -221,7 +221,7 @@ {"name": "rackname", "type": "string"}, {"name": "status", "type": "string"}, {"name": "error", "type": "string"}, - {"name": "counters", "type": "JhCounters"}, + {"name": "counters", "type": ["null","JhCounters"], "default": null}, {"name": "clockSplits", "type": { "type": "array", "items": "int"}}, {"name": "cpuUsages", "type": { "type": "array", "items": "int"}}, {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}}, @@ -237,7 +237,7 @@ {"name": "error", "type": "string"}, {"name": "failedDueToAttempt", "type": ["null", "string"] }, {"name": "status", "type": "string"}, - {"name": "counters", "type": "JhCounters"} + {"name": "counters", "type": ["null","JhCounters"], "default": null} ] }, @@ -248,7 +248,7 @@ {"name": "finishTime", "type": "long"}, {"name": "status", "type": "string"}, {"name": "counters", "type": "JhCounters"}, - {"name": "successfulAttemptId", "type": "string"} + {"name": "successfulAttemptId", "type": ["null","string"], "default": null} ] }, Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Tue Aug 19 23:49:39 2014 @@ -295,6 +295,15 @@ public abstract class FileInputFormat<K, String[] hosts) { return new FileSplit(file, start, length, hosts); } + + /** + * A factory that makes the split for this class. It can be overridden + * by sub-classes to make sub-types + */ + protected FileSplit makeSplit(Path file, long start, long length, + String[] hosts, String[] inMemoryHosts) { + return new FileSplit(file, start, length, hosts, inMemoryHosts); + } /** Splits files returned by {@link #listStatus(JobConf)} when * they're too big.*/ @@ -337,22 +346,22 @@ public abstract class FileInputFormat<K, long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { - String[] splitHosts = getSplitHosts(blkLocations, + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, - splitHosts)); + splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { - String[] splitHosts = getSplitHosts(blkLocations, length + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, - splitHosts)); + splitHosts[0], splitHosts[1])); } } else { - String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); - splits.add(makeSplit(path, 0, length, splitHosts)); + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); + splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { //Create empty hosts array for zero length files @@ -538,10 +547,30 @@ public abstract class FileInputFormat<K, * @param blkLocations The list of block locations * @param offset * @param splitSize - * @return array of hosts that contribute most to this split + * @return an array of hosts that contribute most to this split * @throws IOException */ protected String[] getSplitHosts(BlockLocation[] blkLocations, + long offset, long splitSize, NetworkTopology clusterMap) throws IOException { + return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize, + clusterMap)[0]; + } + + /** + * This function identifies and returns the hosts that contribute + * most for a given split. For calculating the contribution, rack + * locality is treated on par with host locality, so hosts from racks + * that contribute the most are preferred over hosts on racks that + * contribute less + * @param blkLocations The list of block locations + * @param offset + * @param splitSize + * @return two arrays - one of hosts that contribute most to this split, and + * one of hosts that contribute most to this split that have the data + * cached on them + * @throws IOException + */ + private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, long offset, long splitSize, NetworkTopology clusterMap) throws IOException { @@ -552,7 +581,8 @@ public abstract class FileInputFormat<K, //If this is the only block, just return if (bytesInThisBlock >= splitSize) { - return blkLocations[startIndex].getHosts(); + return new String[][] { blkLocations[startIndex].getHosts(), + blkLocations[startIndex].getCachedHosts() }; } long bytesInFirstBlock = bytesInThisBlock; @@ -639,7 +669,9 @@ public abstract class FileInputFormat<K, } // for all indices - return identifyHosts(allTopos.length, racksMap); + // We don't yet support cached hosts when bytesInThisBlock > splitSize + return new String[][] { identifyHosts(allTopos.length, racksMap), + new String[0]}; } private String[] identifyHosts(int replicationFactor, Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Tue Aug 19 23:49:39 2014 @@ -184,10 +184,16 @@ public class FileOutputCommitter extends } @Override + @Deprecated public boolean isRecoverySupported() { return true; } - + + @Override + public boolean isRecoverySupported(JobContext context) throws IOException { + return getWrapped(context).isRecoverySupported(context); + } + @Override public void recoverTask(TaskAttemptContext context) throws IOException { Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java Tue Aug 19 23:49:39 2014 @@ -24,6 +24,7 @@ import java.io.DataOutput; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.Path; /** A section of an input file. Returned by {@link @@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path; @InterfaceAudience.Public @InterfaceStability.Stable public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit - implements InputSplit { + implements InputSplitWithLocationInfo { org.apache.hadoop.mapreduce.lib.input.FileSplit fs; protected FileSplit() { fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(); @@ -62,6 +63,20 @@ public class FileSplit extends org.apach length, hosts); } + /** Constructs a split with host information + * + * @param file the file name + * @param start the position of the first byte in the file to process + * @param length the number of bytes in the file to process + * @param hosts the list of hosts containing the block, possibly null + * @param inMemoryHosts the list of hosts containing the block in memory + */ + public FileSplit(Path file, long start, long length, String[] hosts, + String[] inMemoryHosts) { + fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start, + length, hosts, inMemoryHosts); + } + public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) { this.fs = fs; } @@ -92,4 +107,9 @@ public class FileSplit extends org.apach return fs.getLocations(); } + @Override + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return fs.getLocationInfo(); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java Tue Aug 19 23:49:39 2014 @@ -50,7 +50,7 @@ import org.apache.hadoop.fs.FileSystem; * bytes, of the input files. However, the {@link FileSystem} blocksize of * the input files is treated as an upper bound for input splits. A lower bound * on the split size can be set via - * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize"> + * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize"> * mapreduce.input.fileinputformat.split.minsize</a>.</p> * * <p>Clearly, logical splits based on input-size is insufficient for many Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Tue Aug 19 23:49:39 2014 @@ -112,7 +112,7 @@ import org.apache.log4j.Level; @InterfaceAudience.Public @InterfaceStability.Stable public class JobConf extends Configuration { - + private static final Log LOG = LogFactory.getLog(JobConf.class); static{ @@ -882,7 +882,7 @@ public class JobConf extends Configurati JobContext.KEY_COMPARATOR, null, RawComparator.class); if (theClass != null) return ReflectionUtils.newInstance(theClass, this); - return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class)); + return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this); } /** Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java Tue Aug 19 23:49:39 2014 @@ -184,7 +184,7 @@ public class LineRecordReader implements private int maxBytesToConsume(long pos) { return isCompressedInput() ? Integer.MAX_VALUE - : (int) Math.min(Integer.MAX_VALUE, end - pos); + : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength); } private long getFilePosition() throws IOException { @@ -197,6 +197,39 @@ public class LineRecordReader implements return retVal; } + private int skipUtfByteOrderMark(Text value) throws IOException { + // Strip BOM(Byte Order Mark) + // Text only support UTF-8, we only need to check UTF-8 BOM + // (0xEF,0xBB,0xBF) at the start of the text stream. + int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength, + Integer.MAX_VALUE); + int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos)); + // Even we read 3 extra bytes for the first line, + // we won't alter existing behavior (no backwards incompat issue). + // Because the newSize is less than maxLineLength and + // the number of bytes copied to Text is always no more than newSize. + // If the return size from readLine is not less than maxLineLength, + // we will discard the current line and read the next line. + pos += newSize; + int textLength = value.getLength(); + byte[] textBytes = value.getBytes(); + if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) && + (textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) { + // find UTF-8 BOM, strip it. + LOG.info("Found UTF-8 BOM and skipped it"); + textLength -= 3; + newSize -= 3; + if (textLength > 0) { + // It may work to use the same buffer and not do the copyBytes + textBytes = value.copyBytes(); + value.set(textBytes, 3, textLength); + } else { + value.clear(); + } + } + return newSize; + } + /** Read a line. */ public synchronized boolean next(LongWritable key, Text value) throws IOException { @@ -206,12 +239,17 @@ public class LineRecordReader implements while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { key.set(pos); - int newSize = in.readLine(value, maxLineLength, - Math.max(maxBytesToConsume(pos), maxLineLength)); + int newSize = 0; + if (pos == 0) { + newSize = skipUtfByteOrderMark(value); + } else { + newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); + pos += newSize; + } + if (newSize == 0) { return false; } - pos += newSize; if (newSize < maxLineLength) { return true; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Tue Aug 19 23:49:39 2014 @@ -537,6 +537,8 @@ public class Merger { } } minSegment = top(); + long startPos = minSegment.getPosition(); + key = minSegment.getKey(); if (!minSegment.inMemory()) { //When we load the value from an inmemory segment, we reset //the "value" DIB in this class to the inmem segment's byte[]. @@ -547,11 +549,11 @@ public class Merger { //segment, we reset the "value" DIB to the byte[] in that (so //we reuse the disk segment DIB whenever we consider //a disk segment). + minSegment.getValue(diskIFileValue); value.reset(diskIFileValue.getData(), diskIFileValue.getLength()); + } else { + minSegment.getValue(value); } - long startPos = minSegment.getPosition(); - key = minSegment.getKey(); - minSegment.getValue(value); long endPos = minSegment.getPosition(); totalBytesProcessed += endPos - startPos; mergeProgress.set(totalBytesProcessed * progPerByte); Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java Tue Aug 19 23:49:39 2014 @@ -176,15 +176,35 @@ public abstract class OutputCommitter /** * This method implements the new interface by calling the old method. Note - * that the input types are different between the new and old apis and this - * is a bridge between the two. + * that the input types are different between the new and old apis and this is + * a bridge between the two. + * + * @deprecated Use {@link #isRecoverySupported(JobContext)} instead. */ + @Deprecated @Override public boolean isRecoverySupported() { return false; } /** + * Is task output recovery supported for restarting jobs? + * + * If task output recovery is supported, job restart can be done more + * efficiently. + * + * @param jobContext + * Context of the job whose output is being written. + * @return <code>true</code> if task output recovery is supported, + * <code>false</code> otherwise + * @throws IOException + * @see #recoverTask(TaskAttemptContext) + */ + public boolean isRecoverySupported(JobContext jobContext) throws IOException { + return isRecoverySupported(); + } + + /** * Recover the task output. * * The retry-count for the job will be passed via the @@ -315,4 +335,15 @@ public abstract class OutputCommitter recoverTask((TaskAttemptContext) taskContext); } + /** + * This method implements the new interface by calling the old method. Note + * that the input types are different between the new and old apis and this is + * a bridge between the two. + */ + @Override + public final boolean isRecoverySupported( + org.apache.hadoop.mapreduce.JobContext context) throws IOException { + return isRecoverySupported((JobContext) context); + } + } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Tue Aug 19 23:49:39 2014 @@ -66,6 +66,7 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.util.StringUtils; @@ -322,6 +323,11 @@ abstract public class Task implements Wr protected void reportFatalError(TaskAttemptID id, Throwable throwable, String logMsg) { LOG.fatal(logMsg); + + if (ShutdownHookManager.get().isShutdownInProgress()) { + return; + } + Throwable tCause = throwable.getCause(); String cause = tCause == null ? StringUtils.stringifyException(throwable) @@ -1120,8 +1126,8 @@ abstract public class Task implements Wr if (isMapTask() && conf.getNumReduceTasks() > 0) { try { Path mapOutput = mapOutputFile.getOutputFile(); - FileSystem fs = mapOutput.getFileSystem(conf); - return fs.getFileStatus(mapOutput).getLen(); + FileSystem localFS = FileSystem.getLocal(conf); + return localFS.getFileStatus(mapOutput).getLen(); } catch (IOException e) { LOG.warn ("Could not find output size " , e); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Tue Aug 19 23:49:39 2014 @@ -90,8 +90,8 @@ public class TaskCompletionEvent } /** - * Returns enum Status.SUCESS or Status.FAILURE. - * @return task tracker status + * Returns {@link Status} + * @return task completion status */ public Status getTaskStatus() { return Status.valueOf(super.getStatus().name()); Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Tue Aug 19 23:49:39 2014 @@ -199,16 +199,18 @@ public class TaskLog { // file first and then rename. File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup); - BufferedOutputStream bos = - new BufferedOutputStream( - SecureIOUtils.createForWrite(tmpIndexFile, 0644)); - DataOutputStream dos = new DataOutputStream(bos); - //the format of the index file is - //LOG_DIR: <the dir where the task logs are really stored> - //STDOUT: <start-offset in the stdout file> <length> - //STDERR: <start-offset in the stderr file> <length> - //SYSLOG: <start-offset in the syslog file> <length> + BufferedOutputStream bos = null; + DataOutputStream dos = null; try{ + bos = new BufferedOutputStream( + SecureIOUtils.createForWrite(tmpIndexFile, 0644)); + dos = new DataOutputStream(bos); + //the format of the index file is + //LOG_DIR: <the dir where the task logs are really stored> + //STDOUT: <start-offset in the stdout file> <length> + //STDERR: <start-offset in the stderr file> <length> + //SYSLOG: <start-offset in the syslog file> <length> + dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n" + LogName.STDOUT.toString() + ":"); dos.writeBytes(Long.toString(prevOutLength) + " "); @@ -225,8 +227,10 @@ public class TaskLog { + "\n"); dos.close(); dos = null; + bos.close(); + bos = null; } finally { - IOUtils.cleanup(LOG, dos); + IOUtils.cleanup(LOG, dos, bos); } File indexFile = getIndexFile(currentTaskid, isCleanup); Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java Tue Aug 19 23:49:39 2014 @@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.Reporter /** * An InputFormat capable of performing joins over a set of data sources sorted * and partitioned the same way. - * @see #setFormat * * A user may define new join types by setting the property * <tt>mapred.join.define.<ident></tt> to a classname. In the expression @@ -44,6 +43,7 @@ import org.apache.hadoop.mapred.Reporter * ComposableRecordReader. * <tt>mapred.join.keycomparator</tt> can be a classname used to compare keys * in the join. + * @see #setFormat * @see JoinRecordReader * @see MultiFilterRecordReader */ Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java Tue Aug 19 23:49:39 2014 @@ -131,7 +131,7 @@ public abstract class CompositeRecordRea public void add(ComposableRecordReader<K,? extends V> rr) throws IOException { kids[rr.id()] = rr; if (null == q) { - cmp = WritableComparator.get(rr.createKey().getClass()); + cmp = WritableComparator.get(rr.createKey().getClass(), conf); q = new PriorityQueue<ComposableRecordReader<K,?>>(3, new Comparator<ComposableRecordReader<K,?>>() { public int compare(ComposableRecordReader<K,?> o1,