Repository: hadoop
Updated Branches:
refs/heads/branch-2 ef3e01a1e -> 2f1821850
MAPREDUCE-5870. Support for passing Job priority through Application Submission
Context in Mapreduce Side. Contributed by Sunil G
(cherry picked from commit f634505d48d97e4d461980d68a0cbdf87223646d)
Conflicts:
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2f182185
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2f182185
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2f182185
Branch: refs/heads/branch-2
Commit: 2f18218508524b01a3e19b90950f63920f350c6f
Parents: ef3e01a
Author: Jason Lowe <[email protected]>
Authored: Tue Nov 24 22:15:37 2015 +0000
Committer: Jason Lowe <[email protected]>
Committed: Tue Nov 24 22:15:37 2015 +0000
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../v2/app/local/LocalContainerAllocator.java | 6 ++
.../mapreduce/v2/app/job/impl/TestJobImpl.java | 2 +-
.../app/local/TestLocalContainerAllocator.java | 5 +-
.../apache/hadoop/mapreduce/TypeConverter.java | 48 +++++++++-
.../hadoop/mapreduce/TestTypeConverter.java | 4 +
.../java/org/apache/hadoop/mapred/JobConf.java | 92 ++++++++++++++++++--
.../org/apache/hadoop/mapred/JobPriority.java | 6 ++
.../org/apache/hadoop/mapred/JobStatus.java | 2 +-
.../java/org/apache/hadoop/mapreduce/Job.java | 67 ++++++++++++--
.../apache/hadoop/mapreduce/JobPriority.java | 7 +-
.../org/apache/hadoop/mapreduce/tools/CLI.java | 29 ++++--
.../org/apache/hadoop/mapred/TestJobConf.java | 47 +++++++++-
.../org/apache/hadoop/mapreduce/TestJob.java | 2 +-
.../org/apache/hadoop/mapred/YARNRunner.java | 19 +++-
.../apache/hadoop/mapred/TestYARNRunner.java | 25 ++++++
.../hadoop/mapreduce/TestMRJobClient.java | 5 +-
.../apache/hadoop/mapreduce/v2/TestMRJobs.java | 63 ++++++++++++++
18 files changed, 401 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt
b/hadoop-mapreduce-project/CHANGES.txt
index e84be3a..324f3a4 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -139,6 +139,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6499. Add elapsed time for retired job in JobHistoryServer WebUI.
(Lin Yiqun via aajisaka)
+ MAPREDUCE-5870. Support for passing Job priority through Application
+ Submission Context in Mapreduce Side (Sunil G via jlowe)
+
OPTIMIZATIONS
MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
index aed1023..7437357 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
@@ -43,6 +43,7 @@ import
org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.ClientRMProxy;
@@ -146,6 +147,11 @@ public class LocalContainerAllocator extends RMCommunicator
if (token != null) {
updateAMRMToken(token);
}
+ Priority priorityFromResponse = Priority.newInstance(allocateResponse
+ .getApplicationPriority().getPriority());
+
+ // Update the job priority to Job directly.
+ getJob().setJobPriority(priorityFromResponse);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
index 9e18920..33c5c48 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
@@ -914,7 +914,7 @@ public class TestJobImpl {
assertJobState(job, JobStateInternal.RUNNING);
// Update priority of job to 8, and see whether its updated
- Priority updatedPriority = Priority.newInstance(5);
+ Priority updatedPriority = Priority.newInstance(8);
job.setJobPriority(updatedPriority);
assertJobState(job, JobStateInternal.RUNNING);
Priority jobPriority = job.getReport().getJobPriority();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
index 167d804..38df8f0 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -245,7 +246,7 @@ public class TestLocalContainerAllocator {
amToken.getIdentifier(), amToken.getKind().toString(),
amToken.getPassword(), amToken.getService().toString());
}
- return AllocateResponse.newInstance(responseId,
+ AllocateResponse response = AllocateResponse.newInstance(responseId,
Collections.<ContainerStatus>emptyList(),
Collections.<Container>emptyList(),
Collections.<NodeReport>emptyList(),
@@ -254,6 +255,8 @@ public class TestLocalContainerAllocator {
yarnToken,
Collections.<Container>emptyList(),
Collections.<Container>emptyList());
+ response.setApplicationPriority(Priority.newInstance(0));
+ return response;
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
index 88f61b0..4af5b89 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
@@ -84,6 +84,25 @@ public class TypeConverter {
return jobId;
}
+ public static int toYarnApplicationPriority(String priority) {
+ JobPriority jobPriority = JobPriority.valueOf(priority);
+ switch (jobPriority) {
+ case VERY_HIGH :
+ return 5;
+ case HIGH :
+ return 4;
+ case NORMAL :
+ return 3;
+ case LOW :
+ return 2;
+ case VERY_LOW :
+ return 1;
+ case DEFAULT :
+ return 0;
+ }
+ throw new IllegalArgumentException("Unrecognized priority: " + priority);
+ }
+
private static String fromClusterTimeStamp(long clusterTimeStamp) {
return Long.toString(clusterTimeStamp);
}
@@ -165,6 +184,8 @@ public class TypeConverter {
return Phase.REDUCE;
case CLEANUP:
return Phase.CLEANUP;
+ default:
+ break;
}
throw new YarnRuntimeException("Unrecognized Phase: " + phase);
}
@@ -327,10 +348,33 @@ public class TypeConverter {
return JobPriority.VERY_LOW;
case 0 :
return JobPriority.DEFAULT;
+ default :
+ break;
}
return JobPriority.UNDEFINED_PRIORITY;
}
+ public static org.apache.hadoop.mapreduce.JobPriority
+ fromYarnApplicationPriority(int priority) {
+ switch (priority) {
+ case 5 :
+ return org.apache.hadoop.mapreduce.JobPriority.VERY_HIGH;
+ case 4 :
+ return org.apache.hadoop.mapreduce.JobPriority.HIGH;
+ case 3 :
+ return org.apache.hadoop.mapreduce.JobPriority.NORMAL;
+ case 2 :
+ return org.apache.hadoop.mapreduce.JobPriority.LOW;
+ case 1 :
+ return org.apache.hadoop.mapreduce.JobPriority.VERY_LOW;
+ case 0 :
+ return org.apache.hadoop.mapreduce.JobPriority.DEFAULT;
+ default :
+ break;
+ }
+ return org.apache.hadoop.mapreduce.JobPriority.UNDEFINED_PRIORITY;
+ }
+
public static org.apache.hadoop.mapreduce.QueueState fromYarn(
QueueState state) {
org.apache.hadoop.mapreduce.QueueState qState =
@@ -462,7 +506,9 @@ public class TypeConverter {
TypeConverter.fromYarn(application.getApplicationId()),
0.0f, 0.0f, 0.0f, 0.0f,
TypeConverter.fromYarn(application.getYarnApplicationState(),
application.getFinalApplicationStatus()),
- org.apache.hadoop.mapreduce.JobPriority.NORMAL,
+ fromYarnApplicationPriority(
+ (application.getPriority() == null) ? 0 :
+ application.getPriority().getPriority()),
application.getUser(), application.getName(),
application.getQueue(), jobFile, trackingUrl, false
);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
index 60ce170..2027d01 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
@@ -85,6 +85,7 @@ public class TestTypeConverter {
applicationReport.setStartTime(appStartTime);
applicationReport.setFinishTime(appFinishTime);
applicationReport.setUser("TestTypeConverter-user");
+ applicationReport.setPriority(Priority.newInstance(3));
ApplicationResourceUsageReport appUsageRpt = Records
.newRecord(ApplicationResourceUsageReport.class);
Resource r = Records.newRecord(Resource.class);
@@ -99,6 +100,7 @@ public class TestTypeConverter {
Assert.assertEquals(appStartTime, jobStatus.getStartTime());
Assert.assertEquals(appFinishTime, jobStatus.getFinishTime());
Assert.assertEquals(state.toString(), jobStatus.getState().toString());
+ Assert.assertEquals(JobPriority.NORMAL, jobStatus.getPriority());
}
@Test
@@ -113,6 +115,7 @@ public class TestTypeConverter {
when(mockReport.getYarnApplicationState()).thenReturn(YarnApplicationState.KILLED);
when(mockReport.getUser()).thenReturn("dummy-user");
when(mockReport.getQueue()).thenReturn("dummy-queue");
+ when(mockReport.getPriority()).thenReturn(Priority.newInstance(4));
String jobFile = "dummy-path/job.xml";
try {
@@ -146,6 +149,7 @@ public class TestTypeConverter {
Assert.assertEquals("num used slots info set incorrectly", 3,
status.getNumUsedSlots());
Assert.assertEquals("rsvd mem info set incorrectly", 2048,
status.getReservedMem());
Assert.assertEquals("used mem info set incorrectly", 2048,
status.getUsedMem());
+ Assert.assertEquals("priority set incorrectly", JobPriority.HIGH,
status.getPriority());
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
index c2bb1d5..9ace6e8 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
@@ -1554,25 +1554,105 @@ public class JobConf extends Configuration {
/**
* Set {@link JobPriority} for this job.
- *
+ *
* @param prio the {@link JobPriority} for this job.
*/
public void setJobPriority(JobPriority prio) {
set(JobContext.PRIORITY, prio.toString());
}
-
+
+ /**
+ * Set {@link JobPriority} for this job.
+ *
+ * @param prio the {@link JobPriority} for this job.
+ */
+ public void setJobPriorityAsInteger(int prio) {
+ set(JobContext.PRIORITY, Integer.toString(prio));
+ }
+
/**
* Get the {@link JobPriority} for this job.
- *
+ *
* @return the {@link JobPriority} for this job.
*/
public JobPriority getJobPriority() {
String prio = get(JobContext.PRIORITY);
- if(prio == null) {
+ if (prio == null) {
+ return JobPriority.DEFAULT;
+ }
+
+ JobPriority priority = JobPriority.DEFAULT;
+ try {
+ priority = JobPriority.valueOf(prio);
+ } catch (IllegalArgumentException e) {
+ return convertToJobPriority(Integer.parseInt(prio));
+ }
+ return priority;
+ }
+
+ /**
+ * Get the priority for this job.
+ *
+ * @return the priority for this job.
+ */
+ public int getJobPriorityAsInteger() {
+ String priority = get(JobContext.PRIORITY);
+ if (priority == null) {
+ return 0;
+ }
+
+ int jobPriority = 0;
+ try {
+ jobPriority = convertPriorityToInteger(priority);
+ } catch (IllegalArgumentException e) {
+ return Integer.parseInt(priority);
+ }
+ return jobPriority;
+ }
+
+ private int convertPriorityToInteger(String priority) {
+ JobPriority jobPriority = JobPriority.valueOf(priority);
+ switch (jobPriority) {
+ case VERY_HIGH :
+ return 5;
+ case HIGH :
+ return 4;
+ case NORMAL :
+ return 3;
+ case LOW :
+ return 2;
+ case VERY_LOW :
+ return 1;
+ case DEFAULT :
+ return 0;
+ default:
+ break;
+ }
+
+ // If a user sets the priority as "UNDEFINED_PRIORITY", we can return
+ // 0 which is also default value.
+ return 0;
+ }
+
+ private JobPriority convertToJobPriority(int priority) {
+ switch (priority) {
+ case 5 :
+ return JobPriority.VERY_HIGH;
+ case 4 :
+ return JobPriority.HIGH;
+ case 3 :
return JobPriority.NORMAL;
+ case 2 :
+ return JobPriority.LOW;
+ case 1 :
+ return JobPriority.VERY_LOW;
+ case 0 :
+ return JobPriority.DEFAULT;
+ default:
+ break;
}
-
- return JobPriority.valueOf(prio);
+
+ return JobPriority.UNDEFINED_PRIORITY;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobPriority.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobPriority.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobPriority.java
index b76d46d..cdc1855c 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobPriority.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobPriority.java
@@ -22,6 +22,12 @@ import org.apache.hadoop.classification.InterfaceStability;
/**
* Used to describe the priority of the running job.
+ * DEFAULT : While submitting a job, if the user is not specifying priority,
+ * YARN has the capability to pick the default priority as per its config.
+ * Hence MapReduce can indicate such cases with this new enum.
+ * UNDEFINED_PRIORITY : YARN supports priority as an integer. Hence other than
+ * the five defined enums, YARN can consider other integers also. To generalize
+ * such cases, this specific enum is used.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
index bf2577d..5eebf35 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
@@ -148,7 +148,7 @@ public class JobStatus extends
org.apache.hadoop.mapreduce.JobStatus {
String user, String jobName,
String jobFile, String trackingUrl) {
this(jobid, mapProgress, reduceProgress, cleanupProgress, runState,
- JobPriority.NORMAL, user, jobName, jobFile, trackingUrl);
+ JobPriority.DEFAULT, user, jobName, jobFile, trackingUrl);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
index 9eea4cc..ded9d65 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
@@ -407,7 +407,7 @@ public class Job extends JobContextImpl implements
JobContext {
/**
* Get scheduling info of the job.
*
- * @return the scheduling info of the job
+ * @return the priority info of the job
*/
public JobPriority getPriority() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
@@ -635,27 +635,78 @@ public class Job extends JobContextImpl implements
JobContext {
/**
* Set the priority of a running job.
- * @param priority the new priority for the job.
+ * @param jobPriority the new priority for the job.
* @throws IOException
*/
- public void setPriority(JobPriority priority)
- throws IOException, InterruptedException {
+ public void setPriority(JobPriority jobPriority) throws IOException,
+ InterruptedException {
+ if (state == JobState.DEFINE) {
+ if (jobPriority == JobPriority.UNDEFINED_PRIORITY) {
+ conf.setJobPriorityAsInteger(convertPriorityToInteger(jobPriority));
+ } else {
+ conf.setJobPriority(org.apache.hadoop.mapred.JobPriority
+ .valueOf(jobPriority.name()));
+ }
+ } else {
+ ensureState(JobState.RUNNING);
+ final int tmpPriority = convertPriorityToInteger(jobPriority);
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws IOException, InterruptedException {
+ cluster.getClient()
+ .setJobPriority(getJobID(), Integer.toString(tmpPriority));
+ return null;
+ }
+ });
+ }
+ }
+
+ /**
+ * Set the priority of a running job.
+ *
+ * @param jobPriority
+ * the new priority for the job.
+ * @throws IOException
+ */
+ public void setPriorityAsInteger(int jobPriority) throws IOException,
+ InterruptedException {
if (state == JobState.DEFINE) {
- conf.setJobPriority(
- org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
+ conf.setJobPriorityAsInteger(jobPriority);
} else {
ensureState(JobState.RUNNING);
- final JobPriority tmpPriority = priority;
+ final int tmpPriority = jobPriority;
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws IOException, InterruptedException {
- cluster.getClient().setJobPriority(getJobID(),
tmpPriority.toString());
+ cluster.getClient()
+ .setJobPriority(getJobID(), Integer.toString(tmpPriority));
return null;
}
});
}
}
+ private int convertPriorityToInteger(JobPriority jobPriority) {
+ switch (jobPriority) {
+ case VERY_HIGH :
+ return 5;
+ case HIGH :
+ return 4;
+ case NORMAL :
+ return 3;
+ case LOW :
+ return 2;
+ case VERY_LOW :
+ return 1;
+ case DEFAULT :
+ return 0;
+ default:
+ break;
+ }
+ // For UNDEFINED_PRIORITY, we can set it to default for better handling
+ return 0;
+ }
+
/**
* Get events indicating completion (success/failure) of component tasks.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobPriority.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobPriority.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobPriority.java
index 7178568..4069f1f 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobPriority.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobPriority.java
@@ -22,7 +22,12 @@ import org.apache.hadoop.classification.InterfaceStability;
/**
* Used to describe the priority of the running job.
- *
+ * DEFAULT : While submitting a job, if the user is not specifying priority,
+ * YARN has the capability to pick the default priority as per its config.
+ * Hence MapReduce can indicate such cases with this new enum.
+ * UNDEFINED_PRIORITY : YARN supports priority as an integer. Hence other than
+ * the five defined enums, YARN can consider other integers also. To generalize
+ * such cases, this specific enum is used.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
index 2e185cb..86fc3c8 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java
@@ -97,6 +97,7 @@ public class CLI extends Configured implements Tool {
String taskState = null;
int fromEvent = 0;
int nEvents = 0;
+ int jpvalue = 0;
boolean getStatus = false;
boolean getCounter = false;
boolean killJob = false;
@@ -149,11 +150,15 @@ public class CLI extends Configured implements Tool {
}
jobid = argv[1];
try {
- jp = JobPriority.valueOf(argv[2]);
+ jp = JobPriority.valueOf(argv[2]);
} catch (IllegalArgumentException iae) {
- LOG.info(iae);
- displayUsage(cmd);
- return exitCode;
+ try {
+ jpvalue = Integer.parseInt(argv[2]);
+ } catch (NumberFormatException ne) {
+ LOG.info(ne);
+ displayUsage(cmd);
+ return exitCode;
+ }
}
setJobPriority = true;
} else if ("-events".equals(cmd)) {
@@ -322,7 +327,11 @@ public class CLI extends Configured implements Tool {
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
- job.setPriority(jp);
+ if (jp != null) {
+ job.setPriority(jp);
+ } else {
+ job.setPriorityAsInteger(jpvalue);
+ }
System.out.println("Changed job priority.");
exitCode = 0;
}
@@ -408,6 +417,10 @@ public class CLI extends Configured implements Tool {
private String getJobPriorityNames() {
StringBuffer sb = new StringBuffer();
for (JobPriority p : JobPriority.values()) {
+ // UNDEFINED_PRIORITY need not to be displayed in usage
+ if (JobPriority.UNDEFINED_PRIORITY == p) {
+ continue;
+ }
sb.append(p.name()).append(" ");
}
return sb.substring(0, sb.length()-1);
@@ -444,7 +457,8 @@ public class CLI extends Configured implements Tool {
} else if ("-set-priority".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
"Valid values for priorities are: "
- + jobPriorityValues);
+ + jobPriorityValues
+ + ". In addition to this, integers also can be used.");
} else if ("-list-active-trackers".equals(cmd)) {
System.err.println(prefix + "[" + cmd + "]");
} else if ("-list-blacklisted-trackers".equals(cmd)) {
@@ -465,7 +479,8 @@ public class CLI extends Configured implements Tool {
System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n");
System.err.printf("\t[-kill <job-id>]%n");
System.err.printf("\t[-set-priority <job-id> <priority>]. " +
- "Valid values for priorities are: " + jobPriorityValues + "%n");
+ "Valid values for priorities are: " + jobPriorityValues +
+ ". In addition to this, integers also can be used." + "%n");
System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
System.err.printf("\t[-history <jobHistoryFile>]%n");
System.err.printf("\t[-list [all]]%n");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
index a68ba4f..a87ad70 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
@@ -99,7 +99,7 @@ public class TestJobConf {
assertEquals(70, conf.getMaxReduceTaskFailuresPercent());
// by default
- assertEquals(JobPriority.NORMAL.name(), conf.getJobPriority().name());
+ assertEquals(JobPriority.DEFAULT.name(), conf.getJobPriority().name());
conf.setJobPriority(JobPriority.HIGH);
assertEquals(JobPriority.HIGH.name(), conf.getJobPriority().name());
@@ -359,4 +359,49 @@ public class TestJobConf {
jobConf.getMaxTaskFailuresPerTracker() < jobConf.getMaxReduceAttempts()
);
}
+
+ /**
+ * Test various Job Priority
+ */
+ @Test
+ public void testJobPriorityConf() {
+ JobConf conf = new JobConf();
+
+ // by default
+ assertEquals(JobPriority.DEFAULT.name(), conf.getJobPriority().name());
+ assertEquals(0, conf.getJobPriorityAsInteger());
+ // Set JobPriority.LOW using old API, and verify output from both getter
+ conf.setJobPriority(JobPriority.LOW);
+ assertEquals(JobPriority.LOW.name(), conf.getJobPriority().name());
+ assertEquals(2, conf.getJobPriorityAsInteger());
+
+ // Set JobPriority.VERY_HIGH using old API, and verify output
+ conf.setJobPriority(JobPriority.VERY_HIGH);
+ assertEquals(JobPriority.VERY_HIGH.name(), conf.getJobPriority().name());
+ assertEquals(5, conf.getJobPriorityAsInteger());
+
+ // Set 3 as priority using new API, and verify output from both getter
+ conf.setJobPriorityAsInteger(3);
+ assertEquals(JobPriority.NORMAL.name(), conf.getJobPriority().name());
+ assertEquals(3, conf.getJobPriorityAsInteger());
+
+ // Set 4 as priority using new API, and verify output
+ conf.setJobPriorityAsInteger(4);
+ assertEquals(JobPriority.HIGH.name(), conf.getJobPriority().name());
+ assertEquals(4, conf.getJobPriorityAsInteger());
+ // Now set some high integer values and verify output from old api
+ conf.setJobPriorityAsInteger(57);
+ assertEquals(JobPriority.UNDEFINED_PRIORITY.name(), conf.getJobPriority()
+ .name());
+ assertEquals(57, conf.getJobPriorityAsInteger());
+
+ // Error case where UNDEFINED_PRIORITY is set explicitly
+ conf.setJobPriority(JobPriority.UNDEFINED_PRIORITY);
+ assertEquals(JobPriority.UNDEFINED_PRIORITY.name(), conf.getJobPriority()
+ .name());
+
+ // As UNDEFINED_PRIORITY cannot be mapped to any integer value, resetting
+ // to default as 0.
+ assertEquals(0, conf.getJobPriorityAsInteger());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java
index 94f49ac..71bacf7 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java
@@ -41,7 +41,7 @@ public class TestJob {
when(cluster.getClient()).thenReturn(client);
JobID jobid = new JobID("1014873536921", 6);
JobStatus status = new JobStatus(jobid, 0.0f, 0.0f, 0.0f, 0.0f,
- State.FAILED, JobPriority.NORMAL, "root", "TestJobToString",
+ State.FAILED, JobPriority.DEFAULT, "root", "TestJobToString",
"job file", "tracking url");
when(client.getJobStatus(jobid)).thenReturn(status);
when(client.getTaskReports(jobid, TaskType.MAP)).thenReturn(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 84a27d9..f15b5c1 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -562,13 +562,30 @@ public class YARNRunner implements ClientProtocol {
appContext.setApplicationTags(new HashSet<String>(tagsFromConf));
}
+ String jobPriority = jobConf.get(MRJobConfig.PRIORITY);
+ if (jobPriority != null) {
+ int iPriority;
+ try {
+ iPriority = TypeConverter.toYarnApplicationPriority(jobPriority);
+ } catch (IllegalArgumentException e) {
+ iPriority = Integer.parseInt(jobPriority);
+ }
+ appContext.setPriority(Priority.newInstance(iPriority));
+ }
+
return appContext;
}
@Override
public void setJobPriority(JobID arg0, String arg1) throws IOException,
InterruptedException {
- resMgrDelegate.setJobPriority(arg0, arg1);
+ ApplicationId appId = TypeConverter.toYarn(arg0).getAppId();
+ try {
+ resMgrDelegate.updateApplicationPriority(appId,
+ Priority.newInstance(Integer.parseInt(arg1)));
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
index 010446f..3293a9d 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -604,6 +605,30 @@ public class TestYARNRunner extends TestCase {
assertEquals("Bad SHELL setting", USER_SHELL, shell);
}
+ @Test
+ public void testJobPriority() throws Exception {
+ JobConf jobConf = new JobConf();
+
+ jobConf.set(MRJobConfig.PRIORITY, "LOW");
+
+ YARNRunner yarnRunner = new YARNRunner(jobConf);
+ ApplicationSubmissionContext appSubCtx = buildSubmitContext(yarnRunner,
+ jobConf);
+
+ // 2 corresponds to LOW
+ assertEquals(appSubCtx.getPriority(), Priority.newInstance(2));
+
+ // Set an integer explicitly
+ jobConf.set(MRJobConfig.PRIORITY, "12");
+
+ yarnRunner = new YARNRunner(jobConf);
+ appSubCtx = buildSubmitContext(yarnRunner,
+ jobConf);
+
+ // Verify whether 12 is set to submission context
+ assertEquals(appSubCtx.getPriority(), Priority.newInstance(12));
+ }
+
private ApplicationSubmissionContext buildSubmitContext(
YARNRunner yarnRunner, JobConf jobConf) throws IOException {
File jobxml = new File(testWorkDir, MRJobConfig.JOB_CONF_FILE);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java
index fd36285..bd78b15 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestMRJobClient.java
@@ -527,8 +527,9 @@ public class TestMRJobClient extends
ClusterMapReduceTestCase {
exitCode = runTool(conf, createJobClient(), new String[] { "-set-priority",
jobId, "VERY_LOW" }, new ByteArrayOutputStream());
assertEquals("Exit code", 0, exitCode);
- // because this method does not implemented still.
- verifyJobPriority(jobId, "NORMAL", conf, createJobClient());
+ // set-priority is fired after job is completed in YARN, hence need not
+ // have to update the priority.
+ verifyJobPriority(jobId, "DEFAULT", conf, createJobClient());
}
protected CLI createJobClient() throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2f182185/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
----------------------------------------------------------------------
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
index 2973c39..e685c65 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -159,6 +160,7 @@ public class TestMRJobs {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
+ conf.setInt("yarn.cluster.max-application-priority", 10);
mrCluster.init(conf);
mrCluster.start();
}
@@ -242,6 +244,67 @@ public class TestMRJobs {
// JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
}
+ @Test(timeout = 3000000)
+ public void testJobWithChangePriority() throws Exception {
+
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ Configuration sleepConf = new Configuration(mrCluster.getConfig());
+ // set master address to local to test that local mode applied if framework
+ // equals local
+ sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
+ sleepConf
+ .setInt("yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms", 5);
+
+ SleepJob sleepJob = new SleepJob();
+ sleepJob.setConf(sleepConf);
+ Job job = sleepJob.createJob(1, 1, 1000, 20, 50, 1);
+
+ job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+ job.setJarByClass(SleepJob.class);
+ job.setMaxMapAttempts(1); // speed up failures
+ job.submit();
+
+ // Set the priority to HIGH
+ job.setPriority(JobPriority.HIGH);
+ waitForPriorityToUpdate(job, JobPriority.HIGH);
+ // Verify the priority from job itself
+ Assert.assertEquals(job.getPriority(), JobPriority.HIGH);
+
+ // Change priority to NORMAL (3) with new api
+ job.setPriorityAsInteger(3); // Verify the priority from job itself
+ waitForPriorityToUpdate(job, JobPriority.NORMAL);
+ Assert.assertEquals(job.getPriority(), JobPriority.NORMAL);
+
+ // Change priority to a high integer value with new api
+ job.setPriorityAsInteger(89); // Verify the priority from job itself
+ waitForPriorityToUpdate(job, JobPriority.UNDEFINED_PRIORITY);
+ Assert.assertEquals(job.getPriority(), JobPriority.UNDEFINED_PRIORITY);
+
+ boolean succeeded = job.waitForCompletion(true);
+ Assert.assertTrue(succeeded);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+ }
+
+ private void waitForPriorityToUpdate(Job job, JobPriority expectedStatus)
+ throws IOException, InterruptedException {
+ // Max wait time to get the priority update can be kept as 20sec (100 *
+ // 100ms)
+ int waitCnt = 200;
+ while (waitCnt-- > 0) {
+ if (job.getPriority().equals(expectedStatus)) {
+ // Stop waiting as priority is updated.
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ }
+
@Test(timeout = 300000)
public void testConfVerificationWithClassloader() throws Exception {
testConfVerification(true, false, false, false);