Author: omalley Date: Fri Mar 4 04:40:22 2011 New Revision: 1077650 URL: http://svn.apache.org/viewvc?rev=1077650&view=rev Log: commit 516430ffecf8e7c090920514d8ade7cf6bb79b7e Author: Vinay Kumar Thota <vin...@yahoo-inc.com> Date: Sun Aug 8 20:38:56 2010 +0000
3867536 Fix the instability MR system tests from Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java?rev=1077650&r1=1077649&r2=1077650&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestChildsKillingOfSuspendTask.java Fri Mar 4 04:40:22 2011 @@ -1,312 +1,319 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.mapred; -import java.io.DataOutputStream; -import java.io.IOException; - -import org.junit.Test; -import org.junit.Assert; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.Log; - -import java.util.Collection; -import java.util.Hashtable; - -import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction; -import org.apache.hadoop.mapreduce.test.system.JTProtocol; -import org.apache.hadoop.mapreduce.test.system.JobInfo; -import org.apache.hadoop.mapreduce.test.system.MRCluster; -import org.apache.hadoop.mapreduce.test.system.TTClient; -import org.apache.hadoop.mapreduce.test.system.JTClient; -import org.apache.hadoop.mapreduce.test.system.TTProtocol; -import org.apache.hadoop.mapreduce.test.system.TTTaskInfo; -import org.apache.hadoop.mapreduce.test.system.TaskInfo; -import testjar.GenerateTaskChildProcess; - -public class TestChildsKillingOfSuspendTask { - private static final Log LOG = LogFactory - .getLog(TestChildsKillingOfSuspendTask.class); - private static Configuration conf = new Configuration(); - private static MRCluster cluster; - private static Path inputDir = new Path("input"); - private static Path outputDir = new Path("output"); - private static String confFile = "mapred-site.xml"; - - @BeforeClass - public static void before() throws Exception { - Hashtable<String,Object> prop = new Hashtable<String,Object>(); - prop.put("mapred.map.max.attempts",1L); - prop.put("mapred.task.timeout",30000L); - prop.put("mapreduce.job.complete.cancel.delegation.tokens", false); - String [] expExcludeList = {"java.net.ConnectException", - "java.io.IOException"}; - cluster = MRCluster.createCluster(conf); - cluster.setExcludeExpList(expExcludeList); - cluster.setUp(); - cluster.restartClusterWithNewConfig(prop, confFile); - UtilsForTests.waitFor(1000); - conf = cluster.getJTClient().getProxy().getDaemonConf(); - createInput(inputDir, conf); - } - @AfterClass - public static void after() throws Exception { - cleanup(inputDir, conf); - cleanup(outputDir, conf); - cluster.tearDown(); - cluster.restart(); - } - - /** - * Verify the process tree clean up of a task after - * task is suspended and wait till the task is - * terminated based on timeout. - */ - @Test - public void testProcessTreeCleanupOfSuspendTask() throws - IOException { - TaskInfo taskInfo = null; - TaskID tID = null; - TTTaskInfo [] ttTaskinfo = null; - String pid = null; - TTProtocol ttIns = null; - TTClient ttClientIns = null; - int counter = 0; - - JobConf jobConf = new JobConf(conf); - jobConf.setJobName("Message Display"); - jobConf.setJarByClass(GenerateTaskChildProcess.class); - jobConf.setMapperClass(GenerateTaskChildProcess.StrDisplayMapper.class); - jobConf.setNumMapTasks(1); - jobConf.setNumReduceTasks(0); - jobConf.setMaxMapAttempts(1); - cleanup(outputDir, conf); - FileInputFormat.setInputPaths(jobConf, inputDir); - FileOutputFormat.setOutputPath(jobConf, outputDir); - - JTClient jtClient = cluster.getJTClient(); - JobClient client = jtClient.getClient(); - JTProtocol wovenClient = cluster.getJTClient().getProxy(); - RunningJob runJob = client.submitJob(jobConf); - JobID id = runJob.getID(); - JobInfo jInfo = wovenClient.getJobInfo(id); - Assert.assertNotNull("Job information is null",jInfo); - - Assert.assertTrue("Job has not been started for 1 min.", - jtClient.isJobStarted(id)); - - TaskInfo[] taskInfos = wovenClient.getTaskInfo(id); - for (TaskInfo taskinfo : taskInfos) { - if (!taskinfo.isSetupOrCleanup()) { - taskInfo = taskinfo; - break; - } - } - - Assert.assertTrue("Task has not been started for 1 min.", - jtClient.isTaskStarted(taskInfo)); - - tID = TaskID.downgrade(taskInfo.getTaskID()); - TaskAttemptID tAttID = new TaskAttemptID(tID,0); - FinishTaskControlAction action = new FinishTaskControlAction(tID); - - Collection<TTClient> ttClients = cluster.getTTClients(); - for (TTClient ttClient : ttClients) { - TTProtocol tt = ttClient.getProxy(); - tt.sendAction(action); - ttTaskinfo = tt.getTasks(); - for (TTTaskInfo tttInfo : ttTaskinfo) { - if (!tttInfo.isTaskCleanupTask()) { - pid = tttInfo.getPid(); - ttClientIns = ttClient; - ttIns = tt; - break; - } - } - if (ttClientIns != null) { - break; - } - } - Assert.assertTrue("Map process tree is not alive before task suspend.", - ttIns.isProcessTreeAlive(pid)); - LOG.info("Suspend the task of process id " + pid); - boolean exitCode = ttIns.suspendProcess(pid); - Assert.assertTrue("Process(" + pid + ") has not been suspended", - exitCode); - - LOG.info("Waiting till the task is failed..."); - taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID()); - counter = 0; - while (counter < 60) { - if (taskInfo.getTaskStatus().length > 0) { - if (taskInfo.getTaskStatus()[0].getRunState() == - TaskStatus.State.FAILED) { - break; - } - } - UtilsForTests.waitFor(1000); - taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID()); - counter ++; - } - Assert.assertTrue("Suspended task is failed " - + "before the timeout interval.", counter > 30 && - taskInfo.getTaskStatus()[0].getRunState() == TaskStatus.State.FAILED); - - LOG.info("Waiting till the job is completed..."); - counter = 0; - while (counter < 60) { - if (jInfo.getStatus().isJobComplete()) { - break; - } - UtilsForTests.waitFor(1000); - jInfo = wovenClient.getJobInfo(id); - counter ++; - } - Assert.assertTrue("Job has not been completed for 1 min.", - counter != 60); - ttIns = ttClientIns.getProxy(); - UtilsForTests.waitFor(1000); - Assert.assertTrue("Map process is still alive after task has been failed.", - !ttIns.isProcessTreeAlive(pid)); - } - - /** - * Verify the process tree cleanup of task after task - * is suspended and resumed the task before the timeout. - */ - @Test - public void testProcessTreeCleanupOfSuspendAndResumeTask() throws - IOException { - TaskInfo taskInfo = null; - TaskID tID = null; - TTTaskInfo [] ttTaskinfo = null; - String pid = null; - TTProtocol ttIns = null; - TTClient ttClientIns = null; - int counter = 0; - - JobConf jobConf = new JobConf(conf); - jobConf.setJobName("Message Display"); - jobConf.setJarByClass(GenerateTaskChildProcess.class); - jobConf.setMapperClass(GenerateTaskChildProcess.StrDisplayMapper.class); - jobConf.setNumMapTasks(1); - jobConf.setNumReduceTasks(0); - jobConf.setMaxMapAttempts(1); - cleanup(outputDir, conf); - FileInputFormat.setInputPaths(jobConf, inputDir); - FileOutputFormat.setOutputPath(jobConf, outputDir); - - JTClient jtClient = cluster.getJTClient(); - JobClient client = jtClient.getClient(); - JTProtocol wovenClient = cluster.getJTClient().getProxy(); - RunningJob runJob = client.submitJob(jobConf); - JobID id = runJob.getID(); - JobInfo jInfo = wovenClient.getJobInfo(id); - Assert.assertNotNull("Job information is null",jInfo); - - Assert.assertTrue("Job has not been started for 1 min.", - jtClient.isJobStarted(id)); - - TaskInfo[] taskInfos = wovenClient.getTaskInfo(id); - for (TaskInfo taskinfo : taskInfos) { - if (!taskinfo.isSetupOrCleanup()) { - taskInfo = taskinfo; - break; - } - } - - Assert.assertTrue("Task has not been started for 1 min.", - jtClient.isTaskStarted(taskInfo)); - - tID = TaskID.downgrade(taskInfo.getTaskID()); - TaskAttemptID tAttID = new TaskAttemptID(tID,0); - FinishTaskControlAction action = new FinishTaskControlAction(tID); - - Collection<TTClient> ttClients = cluster.getTTClients(); - for (TTClient ttClient : ttClients) { - TTProtocol tt = ttClient.getProxy(); - tt.sendAction(action); - ttTaskinfo = tt.getTasks(); - for (TTTaskInfo tttInfo : ttTaskinfo) { - if (!tttInfo.isTaskCleanupTask()) { - pid = tttInfo.getPid(); - ttClientIns = ttClient; - ttIns = tt; - break; - } - } - if (ttClientIns != null) { - break; - } - } - Assert.assertTrue("Map process tree is not alive before task suspend.", - ttIns.isProcessTreeAlive(pid)); - LOG.info("Suspend the task of process id " + pid); - boolean exitCode = ttIns.suspendProcess(pid); - Assert.assertTrue("Process(" + pid + ") has not been suspended", - exitCode); - Assert.assertTrue("Map process is not alive after task " - + "has been suspended.", ttIns.isProcessTreeAlive(pid)); - UtilsForTests.waitFor(5000); - exitCode = ttIns.resumeProcess(pid); - Assert.assertTrue("Suspended process(" + pid + ") has not been resumed", - exitCode); - UtilsForTests.waitFor(35000); - taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID()); - Assert.assertTrue("Suspended task has not been resumed", - taskInfo.getTaskStatus()[0].getRunState() == - TaskStatus.State.RUNNING); - UtilsForTests.waitFor(1000); - Assert.assertTrue("Map process tree is not alive after task is resumed.", - ttIns.isProcessTreeAlive(pid)); - } - - private static void cleanup(Path dir, Configuration conf) throws - IOException { - FileSystem fs = dir.getFileSystem(conf); - fs.delete(dir, true); - } - - private static void createInput(Path inDir, Configuration conf) throws - IOException { - String input = "Hadoop is framework for data intensive distributed " - + "applications.\n Hadoop enables applications " - + "to work with thousands of nodes."; - FileSystem fs = inDir.getFileSystem(conf); - if (!fs.mkdirs(inDir)) { - throw new IOException("Failed to create the input directory:" - + inDir.toString()); - } - fs.setPermission(inDir, new FsPermission(FsAction.ALL, - FsAction.ALL, FsAction.ALL)); - DataOutputStream file = fs.create(new Path(inDir, "data.txt")); - int i = 0; - while(i < 10) { - file.writeBytes(input); - i++; - } - file.close(); - } - -} +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.junit.Test; +import org.junit.Assert; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.Log; +import org.apache.hadoop.common.RemoteExecution; + +import java.util.Collection; +import java.util.Hashtable; + +import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction; +import org.apache.hadoop.mapreduce.test.system.JTProtocol; +import org.apache.hadoop.mapreduce.test.system.JobInfo; +import org.apache.hadoop.mapreduce.test.system.MRCluster; +import org.apache.hadoop.mapreduce.test.system.TTClient; +import org.apache.hadoop.mapreduce.test.system.JTClient; +import org.apache.hadoop.mapreduce.test.system.TTProtocol; +import org.apache.hadoop.mapreduce.test.system.TTTaskInfo; +import org.apache.hadoop.mapreduce.test.system.TaskInfo; +import testjar.GenerateTaskChildProcess; + +public class TestChildsKillingOfSuspendTask { + private static final Log LOG = LogFactory + .getLog(TestChildsKillingOfSuspendTask.class); + private static Configuration conf = new Configuration(); + private static MRCluster cluster; + private static Path inputDir = new Path("input"); + private static Path outputDir = new Path("output"); + private static String confFile = "mapred-site.xml"; + + @BeforeClass + public static void before() throws Exception { + Hashtable<String,Object> prop = new Hashtable<String,Object>(); + prop.put("mapred.map.max.attempts",1L); + prop.put("mapred.task.timeout",30000L); + prop.put("mapreduce.job.complete.cancel.delegation.tokens", false); + String [] expExcludeList = {"java.net.ConnectException", + "java.io.IOException","org.apache.hadoop.metrics2.MetricsException"}; + cluster = MRCluster.createCluster(conf); + cluster.setExcludeExpList(expExcludeList); + cluster.setUp(); + cluster.restartClusterWithNewConfig(prop, confFile); + UtilsForTests.waitFor(1000); + conf = cluster.getJTClient().getProxy().getDaemonConf(); + createInput(inputDir, conf); + } + @AfterClass + public static void after() throws Exception { + cleanup(inputDir, conf); + cleanup(outputDir, conf); + cluster.tearDown(); + // cluster.restart(); + } + + /** + * Verify the process tree clean up of a task after + * task is suspended and wait till the task is + * terminated based on timeout. + */ + @Test + public void testProcessTreeCleanupOfSuspendTask() throws + Exception { + TaskInfo taskInfo = null; + TaskID tID = null; + TTTaskInfo [] ttTaskinfo = null; + String pid = null; + TTProtocol ttIns = null; + TTClient ttClientIns = null; + int counter = 0; + + JobConf jobConf = new JobConf(conf); + jobConf.setJobName("Message Display"); + jobConf.setJarByClass(GenerateTaskChildProcess.class); + jobConf.setMapperClass(GenerateTaskChildProcess.StrDisplayMapper.class); + jobConf.setNumMapTasks(1); + jobConf.setNumReduceTasks(0); + jobConf.setMaxMapAttempts(1); + cleanup(outputDir, conf); + FileInputFormat.setInputPaths(jobConf, inputDir); + FileOutputFormat.setOutputPath(jobConf, outputDir); + + JTClient jtClient = cluster.getJTClient(); + JobClient client = jtClient.getClient(); + JTProtocol wovenClient = cluster.getJTClient().getProxy(); + RunningJob runJob = client.submitJob(jobConf); + JobID id = runJob.getID(); + JobInfo jInfo = wovenClient.getJobInfo(id); + Assert.assertNotNull("Job information is null",jInfo); + + Assert.assertTrue("Job has not been started for 1 min.", + jtClient.isJobStarted(id)); + JobStatus[] jobStatus = client.getAllJobs(); + String userName = jobStatus[0].getUsername(); + + TaskInfo[] taskInfos = wovenClient.getTaskInfo(id); + for (TaskInfo taskinfo : taskInfos) { + if (!taskinfo.isSetupOrCleanup()) { + taskInfo = taskinfo; + break; + } + } + + Assert.assertTrue("Task has not been started for 1 min.", + jtClient.isTaskStarted(taskInfo)); + + tID = TaskID.downgrade(taskInfo.getTaskID()); + TaskAttemptID tAttID = new TaskAttemptID(tID,0); + FinishTaskControlAction action = new FinishTaskControlAction(tID); + + Collection<TTClient> ttClients = cluster.getTTClients(); + for (TTClient ttClient : ttClients) { + TTProtocol tt = ttClient.getProxy(); + tt.sendAction(action); + ttTaskinfo = tt.getTasks(); + for (TTTaskInfo tttInfo : ttTaskinfo) { + if (!tttInfo.isTaskCleanupTask()) { + pid = tttInfo.getPid(); + ttClientIns = ttClient; + ttIns = tt; + break; + } + } + if (ttClientIns != null) { + break; + } + } + Assert.assertTrue("Map process tree is not alive before task suspend.", + ttIns.isProcessTreeAlive(pid)); + LOG.info("Suspend the task of process id " + pid); + ExecuteShellCommand execcmd = new ExecuteShellCommand(userName, + ttClientIns.getHostName(), "kill -SIGSTOP " + pid); + execcmd.start(); + execcmd.join(); + UtilsForTests.waitFor(30000); + Assert.assertTrue("Process(" + pid + ") has not been suspended", + execcmd.getStatus()); + ttIns = ttClientIns.getProxy(); + UtilsForTests.waitFor(1000); + Assert.assertTrue("Map process is still alive after task has been failed.", + !ttIns.isProcessTreeAlive(pid)); + } + + /** + * Verify the process tree cleanup of task after task + * is suspended and resumed the task before the timeout. + */ + @Test + public void testProcessTreeCleanupOfSuspendAndResumeTask() throws + Exception { + TaskInfo taskInfo = null; + TaskID tID = null; + TTTaskInfo [] ttTaskinfo = null; + String pid = null; + TTProtocol ttIns = null; + TTClient ttClientIns = null; + int counter = 0; + + JobConf jobConf = new JobConf(conf); + jobConf.setJobName("Message Display"); + jobConf.setJarByClass(GenerateTaskChildProcess.class); + jobConf.setMapperClass(GenerateTaskChildProcess.StrDisplayMapper.class); + jobConf.setNumMapTasks(1); + jobConf.setNumReduceTasks(0); + jobConf.setMaxMapAttempts(1); + cleanup(outputDir, conf); + FileInputFormat.setInputPaths(jobConf, inputDir); + FileOutputFormat.setOutputPath(jobConf, outputDir); + + JTClient jtClient = cluster.getJTClient(); + JobClient client = jtClient.getClient(); + JTProtocol wovenClient = cluster.getJTClient().getProxy(); + RunningJob runJob = client.submitJob(jobConf); + JobID id = runJob.getID(); + JobInfo jInfo = wovenClient.getJobInfo(id); + Assert.assertNotNull("Job information is null",jInfo); + + Assert.assertTrue("Job has not been started for 1 min.", + jtClient.isJobStarted(id)); + + JobStatus[] jobStatus = client.getAllJobs(); + String userName = jobStatus[0].getUsername(); + + TaskInfo[] taskInfos = wovenClient.getTaskInfo(id); + for (TaskInfo taskinfo : taskInfos) { + if (!taskinfo.isSetupOrCleanup()) { + taskInfo = taskinfo; + break; + } + } + + Assert.assertTrue("Task has not been started for 1 min.", + jtClient.isTaskStarted(taskInfo)); + + tID = TaskID.downgrade(taskInfo.getTaskID()); + TaskAttemptID tAttID = new TaskAttemptID(tID,0); + FinishTaskControlAction action = new FinishTaskControlAction(tID); + + Collection<TTClient> ttClients = cluster.getTTClients(); + for (TTClient ttClient : ttClients) { + TTProtocol tt = ttClient.getProxy(); + tt.sendAction(action); + ttTaskinfo = tt.getTasks(); + for (TTTaskInfo tttInfo : ttTaskinfo) { + if (!tttInfo.isTaskCleanupTask()) { + pid = tttInfo.getPid(); + ttClientIns = ttClient; + ttIns = tt; + break; + } + } + if (ttClientIns != null) { + break; + } + } + Assert.assertTrue("Map process tree is not alive before task suspend.", + ttIns.isProcessTreeAlive(pid)); + LOG.info("Suspend the task of process id " + pid); + ExecuteShellCommand execcmd = new ExecuteShellCommand(userName, + ttClientIns.getHostName(), "kill -SIGSTOP " + pid); + execcmd.start(); + execcmd.join(); + + Assert.assertTrue("Process(" + pid + ") has not been suspended", + execcmd.getStatus()); + Assert.assertTrue("Map process is not alive after task " + + "has been suspended.", ttIns.isProcessTreeAlive(pid)); + UtilsForTests.waitFor(5000); + ExecuteShellCommand execcmd1 = new ExecuteShellCommand(userName, + ttClientIns.getHostName(), "kill -SIGCONT " + pid); + execcmd1.start(); + execcmd1.join(); + Assert.assertTrue("Suspended process(" + pid + ") has not been resumed", + execcmd1.getStatus()); + UtilsForTests.waitFor(5000); + Assert.assertTrue("Map process tree is not alive after task is resumed.", + ttIns.isProcessTreeAlive(pid)); + } + + private static void cleanup(Path dir, Configuration conf) throws + IOException { + FileSystem fs = dir.getFileSystem(conf); + fs.delete(dir, true); + } + + private static void createInput(Path inDir, Configuration conf) throws + IOException { + String input = "Hadoop is framework for data intensive distributed " + + "applications.\n Hadoop enables applications " + + "to work with thousands of nodes."; + FileSystem fs = inDir.getFileSystem(conf); + if (!fs.mkdirs(inDir)) { + throw new IOException("Failed to create the input directory:" + + inDir.toString()); + } + fs.setPermission(inDir, new FsPermission(FsAction.ALL, + FsAction.ALL, FsAction.ALL)); + DataOutputStream file = fs.create(new Path(inDir, "data.txt")); + int i = 0; + while(i < 10) { + file.writeBytes(input); + i++; + } + file.close(); + } + + class ExecuteShellCommand extends Thread { + String userName; + String cmd; + String hostName; + boolean exitStatus; + public ExecuteShellCommand(String userName, String hostName, String cmd) { + this.userName = userName; + this.hostName = hostName; + this.cmd = cmd; + } + public void run() { + try { + RemoteExecution.executeCommand(hostName, userName, cmd); + exitStatus = true; + } catch(InterruptedException iexp) { + LOG.warn("Thread is interrupted:" + iexp.getMessage()); + exitStatus = false; + } catch(Exception exp) { + LOG.warn("Exception:" + exp.getMessage()); + exitStatus = false; + } + } + public boolean getStatus(){ + return exitStatus; + } + } +} Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java?rev=1077650&r1=1077649&r2=1077650&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java Fri Mar 4 04:40:22 2011 @@ -186,7 +186,9 @@ public class TestJobCacheDirectoriesClea TaskID taskId = TaskID.downgrade(taskinfo.getTaskID()); TaskAttemptID taskAttID = new TaskAttemptID(taskId, taskinfo.numFailedAttempts()); - while(taskinfo.numFailedAttempts() < 4) { + int MAX_MAP_TASK_ATTEMPTS = Integer. + parseInt(jobConf.get("mapred.map.max.attempts")); + while(taskinfo.numFailedAttempts() < MAX_MAP_TASK_ATTEMPTS) { NetworkedJob networkJob = jtClient.getClient(). new NetworkedJob(jobInfo.getStatus()); networkJob.killTask(taskAttID, true); Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java?rev=1077650&r1=1077649&r2=1077650&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskController.java Fri Mar 4 04:40:22 2011 @@ -5,23 +5,19 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.test.system.MRCluster; -import org.apache.hadoop.mapreduce.test.system.TTClient; import org.apache.hadoop.mapreduce.test.system.JTClient; import org.apache.hadoop.mapreduce.test.system.JTProtocol; import org.apache.hadoop.mapreduce.test.system.JobInfo; -import org.apache.hadoop.mapreduce.test.system.TaskInfo; -import org.apache.hadoop.mapred.ClusterWithLinuxTaskController; +import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.examples.SleepJob; import org.junit.Before; import org.junit.After; import org.junit.Test; import org.junit.Assert; -import java.io.IOException; -import java.util.Hashtable; /** - * Set the invalid configuration to task controller and verify whether the - * task status of a job. + * Set the invalid configuration to task controller and verify the + * job status. */ public class TestTaskController { private static final Log LOG = LogFactory.getLog(TestTaskController.class); @@ -29,19 +25,14 @@ public class TestTaskController { private static MRCluster cluster; private static JTProtocol remoteJTClient; private static JTClient jtClient; - private static String confFile = "mapred-site.xml"; + @Before public void before() throws Exception { - Hashtable<String,Object> prop = new Hashtable<String,Object>(); - prop.put("mapred.local.dir","/mapred/local"); - prop.put("mapred.map.max.attempts", 1L); - prop.put("mapreduce.job.complete.cancel.delegation.tokens", false); String [] expExcludeList = {"java.net.ConnectException", - "java.io.IOException"}; + "java.io.IOException"}; cluster = MRCluster.createCluster(conf); cluster.setExcludeExpList(expExcludeList); cluster.setUp(); - cluster.restartClusterWithNewConfig(prop, confFile); jtClient = cluster.getJTClient(); remoteJTClient = jtClient.getProxy(); } @@ -49,12 +40,11 @@ public class TestTaskController { @After public void after() throws Exception { cluster.tearDown(); - cluster.restart(); } /** * Set the invalid mapred local directory location and run the job. - * Verify whether job has failed or not. + * Verify the job status. * @throws Exception - if an error occurs. */ @Test @@ -63,50 +53,32 @@ public class TestTaskController { conf = remoteJTClient.getDaemonConf(); if (conf.get("mapred.task.tracker.task-controller"). equals("org.apache.hadoop.mapred.LinuxTaskController")) { - TaskController linuxTC = new LinuxTaskController(); - linuxTC.setConf(conf); + StringBuffer mapredLocalDir = new StringBuffer(); + LOG.info("JobConf.MAPRED_LOCAL_DIR_PROPERTY:" + conf.get(JobConf.MAPRED_LOCAL_DIR_PROPERTY)); + mapredLocalDir.append(conf.get(JobConf.MAPRED_LOCAL_DIR_PROPERTY)); + mapredLocalDir.append(","); + mapredLocalDir.append("/mapred/local"); + String jobArgs []= {"-D","mapred.local.dir=" + mapredLocalDir.toString(), + "-m", "1", + "-r", "1", + "-mt", "1000", + "-rt", "1000", + "-recordt","100"}; SleepJob job = new SleepJob(); - job.setConf(conf); - final JobConf jobConf = job.setupJobConf(2, 1, 4000, 4000, 100, 100); + JobConf jobConf = new JobConf(conf); + int exitStatus = ToolRunner.run(jobConf, job, jobArgs); + Assert.assertEquals("Exit Code:", 0, exitStatus); + UtilsForTests.waitFor(100); JobClient jobClient = jtClient.getClient(); - RunningJob runJob = jobClient.submitJob(jobConf); - JobID jobId = runJob.getID(); - Assert.assertTrue("Job has not been started for 1 min.", - jtClient.isJobStarted(jobId)); - TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(jobId); - TaskInfo taskInfo = null; - for (TaskInfo taskinfo : taskInfos) { - if (!taskinfo.isSetupOrCleanup()) { - taskInfo = taskinfo; - break; - } - } - Assert.assertTrue("Task has not been started for 1 min.", - jtClient.isTaskStarted(taskInfo)); - TaskID tID = TaskID.downgrade(taskInfo.getTaskID()); - TaskAttemptID taskAttID = new TaskAttemptID(tID, 0); - TaskStatus taskStatus = null; - int counter = 0; - while(counter++ < 60) { - if (taskInfo.getTaskStatus().length > 0) { - taskStatus = taskInfo.getTaskStatus()[0]; - break; - } - taskInfo = remoteJTClient.getTaskInfo(tID); - UtilsForTests.waitFor(1000); - } - while (taskInfo.getTaskStatus()[0].getRunState() == - TaskStatus.State.RUNNING) { - UtilsForTests.waitFor(1000); - taskInfo = remoteJTClient.getTaskInfo(tID); - } - Assert.assertTrue("Job has not been stopped for 1 min.", - jtClient.isJobStopped(jobId)); - JobInfo jobInfo = remoteJTClient.getJobInfo(jobId); - Assert.assertEquals("Job has not been failed", - jobInfo.getStatus().getRunState(), JobStatus.FAILED); - } else { - Assert.assertTrue("Linux Task controller not found.", false); - } + JobID jobId =jobClient.getAllJobs()[0].getJobID(); + LOG.info("JobId:" + jobId); + if (jobId != null) { + JobInfo jInfo = remoteJTClient.getJobInfo(jobId); + Assert.assertEquals("Job has not been succeeded", + jInfo.getStatus().getRunState(), JobStatus.SUCCEEDED); + } + } else { + Assert.assertTrue("Linux Task controller not found.", false); + } } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java?rev=1077650&r1=1077649&r2=1077650&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java Fri Mar 4 04:40:22 2011 @@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.test. import org.apache.hadoop.mapreduce.test.system.JobInfo; import org.apache.hadoop.mapreduce.test.system.TaskInfo; import org.apache.hadoop.mapreduce.test.system.TTClient; +import org.apache.hadoop.mapreduce.test.system.JTClient; import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction; import org.apache.hadoop.mapred.JobClient.NetworkedJob; import org.apache.hadoop.io.NullWritable; @@ -53,18 +54,17 @@ public class TestTaskKilling { private static final Log LOG = LogFactory.getLog(TestTaskKilling.class); private static MRCluster cluster; private static JobClient jobClient = null; + private static JTClient jtClient = null; private static JTProtocol remoteJTClient = null; - - public TestTaskKilling() { - } + private static Configuration conf = new Configuration(); @BeforeClass - public static void before() throws Exception { - Configuration conf = new Configuration(); + public static void before() throws Exception { cluster = MRCluster.createCluster(conf); cluster.setUp(); - jobClient = cluster.getJTClient().getClient(); - remoteJTClient = cluster.getJTClient().getProxy(); + jtClient = cluster.getJTClient(); + jobClient = jtClient.getClient(); + remoteJTClient = jtClient.getProxy(); } @AfterClass @@ -79,64 +79,39 @@ public class TestTaskKilling { @Test public void testFailedTaskJobStatus() throws IOException, InterruptedException { - Configuration conf = new Configuration(cluster.getConf()); + conf = remoteJTClient.getDaemonConf(); TaskInfo taskInfo = null; SleepJob job = new SleepJob(); job.setConf(conf); - conf = job.setupJobConf(3, 1, 4000, 4000, 100, 100); - JobConf jobConf = new JobConf(conf); - jobConf.setMaxMapAttempts(20); - jobConf.setMaxReduceAttempts(20); + JobConf jobConf = job.setupJobConf(1, 1, 10000, 4000, 100, 100); RunningJob runJob = jobClient.submitJob(jobConf); - JobID id = runJob.getID(); - JobInfo jInfo = remoteJTClient.getJobInfo(id); - int counter = 0; - while (counter < 60) { - if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) { - break; - } else { - UtilsForTests.waitFor(1000); - jInfo = remoteJTClient.getJobInfo(id); - } - counter ++; - } - Assert.assertTrue("Job has not been started for 1 min.", counter != 60); - - TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id); + JobID jobId = runJob.getID(); + JobInfo jInfo = remoteJTClient.getJobInfo(jobId); + Assert.assertTrue("Job has not been started for 1 min.", + jtClient.isJobStarted(jobId)); + TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(jobId); for (TaskInfo taskinfo : taskInfos) { - if (!taskinfo.isSetupOrCleanup()) { + if (!taskinfo.isSetupOrCleanup() && taskinfo.getTaskID().isMap()) { taskInfo = taskinfo; + break; } } + Assert.assertTrue("Task has not been started for 1 min.", + jtClient.isTaskStarted(taskInfo)); - counter = 0; - taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID()); - while (counter < 60) { - if (taskInfo.getTaskStatus().length > 0) { - if (taskInfo.getTaskStatus()[0].getRunState() - == TaskStatus.State.RUNNING) { - break; - } - } - UtilsForTests.waitFor(1000); - taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID()); - counter++; - } - Assert.assertTrue("Task has not been started for 1 min.", counter != 60); - + // Fail the running task. NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus()); TaskID tID = TaskID.downgrade(taskInfo.getTaskID()); TaskAttemptID taskAttID = new TaskAttemptID(tID , 0); - networkJob.killTask(taskAttID, false); + networkJob.killTask(taskAttID, true); LOG.info("Waiting till the job is completed..."); while (!jInfo.getStatus().isJobComplete()) { UtilsForTests.waitFor(100); - jInfo = remoteJTClient.getJobInfo(id); + jInfo = remoteJTClient.getJobInfo(jobId); } - - Assert.assertEquals("JobStatus", jInfo.getStatus().getRunState(), - JobStatus.SUCCEEDED); + Assert.assertEquals("JobStatus", JobStatus.SUCCEEDED, + jInfo.getStatus().getRunState()); } @@ -151,7 +126,6 @@ public class TestTaskKilling { boolean isTempFolderExists = false; String localTaskDir = null; TTClient ttClient = null; - TaskID tID = null; FileStatus filesStatus [] = null; Path inputDir = new Path("input"); Path outputDir = new Path("output"); @@ -164,8 +138,6 @@ public class TestTaskKilling { jconf.setReducerClass(WordCount.Reduce.class); jconf.setNumMapTasks(1); jconf.setNumReduceTasks(1); - jconf.setMaxMapAttempts(20); - jconf.setMaxReduceAttempts(20); jconf.setOutputKeyClass(Text.class); jconf.setOutputValueClass(IntWritable.class); @@ -177,61 +149,46 @@ public class TestTaskKilling { RunningJob runJob = jobClient.submitJob(jconf); JobID id = runJob.getID(); JobInfo jInfo = remoteJTClient.getJobInfo(id); - int counter = 0; - while (counter < 60) { - if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) { - break; - } else { - UtilsForTests.waitFor(1000); - jInfo = remoteJTClient.getJobInfo(id); - } - counter ++; - } - Assert.assertTrue("Job has not been started for 1 min.", counter != 60); + Assert.assertTrue("Job has not been started for 1 min.", + jtClient.isJobStarted(id)); JobStatus[] jobStatus = jobClient.getAllJobs(); String userName = jobStatus[0].getUsername(); TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id); for (TaskInfo taskinfo : taskInfos) { - if (!taskinfo.isSetupOrCleanup()) { + if (!taskinfo.isSetupOrCleanup() && taskinfo.getTaskID().isMap()) { taskInfo = taskinfo; break; } } - counter = 0; - while (counter < 30) { - if (taskInfo.getTaskStatus().length > 0) { - if (taskInfo.getTaskStatus()[0].getRunState() - == TaskStatus.State.RUNNING) { - break; - } - } - UtilsForTests.waitFor(1000); - taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID()); - counter ++; - } - Assert.assertTrue("Task has not been started for 30 sec.", - counter != 30); + Assert.assertTrue("Task has not been started for 1 min.", + jtClient.isTaskStarted(taskInfo)); - tID = TaskID.downgrade(taskInfo.getTaskID()); + TaskID tID = TaskID.downgrade(taskInfo.getTaskID()); FinishTaskControlAction action = new FinishTaskControlAction(tID); String[] taskTrackers = taskInfo.getTaskTrackers(); - counter = 0; - while (counter < 30) { - if (taskTrackers.length != 0) { + int counter = 0; + TaskInfo prvTaskInfo = taskInfo; + while (counter++ < 30) { + if (taskTrackers.length > 0) { break; + } else { + UtilsForTests.waitFor(100); + taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID()); + if (taskInfo == null) { + taskInfo = prvTaskInfo; + } else { + prvTaskInfo = taskInfo; + } + taskTrackers = taskInfo.getTaskTrackers(); } - UtilsForTests.waitFor(100); - taskTrackers = taskInfo.getTaskTrackers(); - counter ++; } - + Assert.assertTrue("TaskTracker is not found.", taskTrackers.length > 0); String hostName = taskTrackers[0].split("_")[1]; hostName = hostName.split(":")[0]; - ttClient = cluster.getTTClient(hostName); - ttClient.getProxy().sendAction(action); + ttClient = cluster.getTTClient(hostName); String localDirs[] = ttClient.getMapredLocalDirs(); TaskAttemptID taskAttID = new TaskAttemptID(tID, 0); for (String localDir : localDirs) { @@ -241,46 +198,49 @@ public class TestTaskKilling { filesStatus = ttClient.listStatus(localTaskDir, true); if (filesStatus.length > 0) { isTempFolderExists = true; - NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus()); - networkJob.killTask(taskAttID, false); break; } } - + Assert.assertTrue("Task Attempt directory " + taskAttID + " has not been found while task was running.", isTempFolderExists); + + NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus()); + networkJob.killTask(taskAttID, false); + ttClient.getProxy().sendAction(action); + taskInfo = remoteJTClient.getTaskInfo(tID); + while(taskInfo.getTaskStatus()[0].getRunState() == + TaskStatus.State.RUNNING) { + UtilsForTests.waitFor(1000); + taskInfo = remoteJTClient.getTaskInfo(tID); + } + UtilsForTests.waitFor(1000); + taskInfo = remoteJTClient.getTaskInfo(tID); + Assert.assertTrue("Task status has not been changed to KILLED.", + (TaskStatus.State.KILLED == + taskInfo.getTaskStatus()[0].getRunState() + || TaskStatus.State.KILLED_UNCLEAN == + taskInfo.getTaskStatus()[0].getRunState())); taskInfo = remoteJTClient.getTaskInfo(tID); - counter = 0; - while (counter < 60) { - UtilsForTests.waitFor(1000); - taskInfo = remoteJTClient.getTaskInfo(tID); + while (counter++ < 60) { filesStatus = ttClient.listStatus(localTaskDir, true); if (filesStatus.length == 0) { break; + } else { + UtilsForTests.waitFor(100); } - counter ++; } - Assert.assertTrue("Task attempt temporary folder has not been cleaned.", isTempFolderExists && filesStatus.length == 0); - counter = 0; - while (counter < 30) { - UtilsForTests.waitFor(1000); - taskInfo = remoteJTClient.getTaskInfo(tID); - counter ++; + UtilsForTests.waitFor(1000); + jInfo = remoteJTClient.getJobInfo(id); + LOG.info("Waiting till the job is completed..."); + while (!jInfo.getStatus().isJobComplete()) { + UtilsForTests.waitFor(100); + jInfo = remoteJTClient.getJobInfo(id); } - taskInfo = remoteJTClient.getTaskInfo(tID); - Assert.assertEquals("Task status has not been changed to KILLED.", - TaskStatus.State.KILLED, - taskInfo.getTaskStatus()[0].getRunState()); - //Kill the job before testcase finishes. - runJob.killJob(); - - Assert.assertTrue("Job has not been stopped for 1 min.", - ((cluster.getJTClient()).isJobStopped(id))); - } private void cleanup(Path dir, Configuration conf) throws @@ -323,81 +283,51 @@ public class TestTaskKilling { TaskInfo taskInfo = null; TaskID tID = null; boolean isTempFolderExists = false; - Path inputDir = new Path("input"); - Path outputDir = new Path("output"); - Configuration conf = new Configuration(cluster.getConf()); - JobConf jconf = new JobConf(conf); - jconf.setJobName("Task Failed job"); - jconf.setJarByClass(UtilsForTests.class); - jconf.setMapperClass(FailedMapperClass.class); - jconf.setNumMapTasks(1); - jconf.setNumReduceTasks(0); - jconf.setMaxMapAttempts(1); - cleanup(inputDir, conf); - cleanup(outputDir, conf); - createInput(inputDir, conf); - FileInputFormat.setInputPaths(jconf, inputDir); - FileOutputFormat.setOutputPath(jconf, outputDir); - RunningJob runJob = jobClient.submitJob(jconf); + conf = remoteJTClient.getDaemonConf(); + SleepJob job = new SleepJob(); + job.setConf(conf); + JobConf jobConf = job.setupJobConf(1, 0, 10000,100, 10, 10); + RunningJob runJob = jobClient.submitJob(jobConf); JobID id = runJob.getID(); JobInfo jInfo = remoteJTClient.getJobInfo(id); - - int counter = 0; - while (counter < 60) { - if (jInfo.getStatus().getRunState() == JobStatus.RUNNING) { - break; - } else { - UtilsForTests.waitFor(1000); - jInfo = remoteJTClient.getJobInfo(id); - } - counter ++; - } - Assert.assertTrue("Job has not been started for 1 min.", counter != 60); + Assert.assertTrue("Job has not been started for 1 min.", + jtClient.isJobStarted(id)); JobStatus[] jobStatus = jobClient.getAllJobs(); String userName = jobStatus[0].getUsername(); TaskInfo[] taskInfos = remoteJTClient.getTaskInfo(id); for (TaskInfo taskinfo : taskInfos) { - if (!taskinfo.isSetupOrCleanup()) { + if (!taskinfo.isSetupOrCleanup() && taskinfo.getTaskID().isMap()) { taskInfo = taskinfo; break; } } - + Assert.assertTrue("Task has not been started for 1 min.", + jtClient.isTaskStarted(taskInfo)); + tID = TaskID.downgrade(taskInfo.getTaskID()); FinishTaskControlAction action = new FinishTaskControlAction(tID); String[] taskTrackers = taskInfo.getTaskTrackers(); - counter = 0; - while (counter < 30) { - if (taskTrackers.length != 0) { + int counter = 0; + TaskInfo prvTaskInfo = taskInfo; + while (counter++ < 30) { + if (taskTrackers.length > 0) { break; + } else { + UtilsForTests.waitFor(1000); + taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID()); + if (taskInfo == null) { + taskInfo = prvTaskInfo; + } else { + prvTaskInfo = taskInfo; + } + taskTrackers = taskInfo.getTaskTrackers(); } - UtilsForTests.waitFor(1000); - taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID()); - taskTrackers = taskInfo.getTaskTrackers(); - counter ++; } - Assert.assertTrue("Task tracker not found.", taskTrackers.length != 0); + Assert.assertTrue("Task tracker not found.", taskTrackers.length > 0); String hostName = taskTrackers[0].split("_")[1]; hostName = hostName.split(":")[0]; ttClient = cluster.getTTClient(hostName); - ttClient.getProxy().sendAction(action); - - counter = 0; - while(counter < 60) { - if (taskInfo.getTaskStatus().length > 0) { - if (taskInfo.getTaskStatus()[0].getRunState() - == TaskStatus.State.RUNNING) { - break; - } - } - UtilsForTests.waitFor(1000); - taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID()); - counter ++; - } - Assert.assertTrue("Task has not been started for 1 min.", - counter != 60); - String localDirs[] = ttClient.getMapredLocalDirs(); TaskAttemptID taskAttID = new TaskAttemptID(tID, 0); for (String localDir : localDirs) { @@ -409,57 +339,49 @@ public class TestTaskKilling { isTempFolderExists = true; break; } - } - - taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID()); + } + Assert.assertTrue("Task Attempt directory " + taskAttID + " has not been found while task was running.", isTempFolderExists); - counter = 0; - while (counter < 30) { - if (taskInfo.getTaskStatus().length > 0) { - break; + boolean isFailTask = false; + JobInfo jobInfo = remoteJTClient.getJobInfo(id); + int MAX_MAP_TASK_ATTEMPTS = Integer.parseInt( + jobConf.get("mapred.map.max.attempts")); + if (!isFailTask) { + TaskID taskId = TaskID.downgrade(taskInfo.getTaskID()); + TaskAttemptID tAttID = new TaskAttemptID(taskId, + taskInfo.numFailedAttempts()); + while(taskInfo.numFailedAttempts() < MAX_MAP_TASK_ATTEMPTS) { + NetworkedJob networkJob = jtClient.getClient(). + new NetworkedJob(jobInfo.getStatus()); + networkJob.killTask(taskAttID, true); + taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID()); + taskAttID = new TaskAttemptID(taskId, taskInfo.numFailedAttempts()); + } + isFailTask=true; } - UtilsForTests.waitFor(1000); - taskInfo = remoteJTClient.getTaskInfo(tID); - counter ++; - } - - while (taskInfo.getTaskStatus()[0].getRunState() == - TaskStatus.State.RUNNING) { - UtilsForTests.waitFor(1000); - taskInfo = remoteJTClient.getTaskInfo(tID); - } - Assert.assertEquals("Task status has not been changed to FAILED.", - taskInfo.getTaskStatus()[0].getRunState(), - TaskStatus.State.FAILED); - + + ttClient.getProxy().sendAction(action); + taskInfo = remoteJTClient.getTaskInfo(tID); + Assert.assertTrue("Task status has not been changed to FAILED.", + TaskStatus.State.FAILED == + taskInfo.getTaskStatus()[0].getRunState() + || TaskStatus.State.FAILED_UNCLEAN == + taskInfo.getTaskStatus()[0].getRunState()); + UtilsForTests.waitFor(1000); filesStatus = ttClient.listStatus(localTaskDir, true); Assert.assertTrue("Temporary folder has not been cleanup.", filesStatus.length == 0); - - } - - public static class FailedMapperClass implements - Mapper<NullWritable, NullWritable, NullWritable, NullWritable> { - public void configure(JobConf job) { - } - public void map(NullWritable key, NullWritable value, - OutputCollector<NullWritable, NullWritable> output, - Reporter reporter) throws IOException { - int counter = 0; - while (counter < 240) { - UtilsForTests.waitFor(1000); - counter ++; - } - if (counter == 240) { - throw new IOException(); - } - } - public void close() { + UtilsForTests.waitFor(1000); + jInfo = remoteJTClient.getJobInfo(id); + LOG.info("Waiting till the job is completed..."); + while (!jInfo.getStatus().isJobComplete()) { + UtilsForTests.waitFor(100); + jInfo = remoteJTClient.getJobInfo(id); } } - + @Test /** * This tests verification of job killing by killing of all task