This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 00eb678 Avoid adding JobConfig if queue has reached its capacity
limit (#1064)
00eb678 is described below
commit 00eb6786d3430fe31f313874dea96b78309c4f51
Author: Ali Reza Zamani Zadeh Najari <[email protected]>
AuthorDate: Fri Jun 5 15:27:58 2020 -0700
Avoid adding JobConfig if queue has reached its capacity limit (#1064)
Avoid adding JobConfig if queue has reached its capacity limit
In this commit, the necessary check has been added in TaskDriver side
to fail creation of JobConfig if the queue has reached its capacity limit.
---
.../java/org/apache/helix/task/TaskDriver.java | 15 ++++++++
.../helix/integration/task/TestEnqueueJobs.java | 43 ++++++++++++++++++++++
2 files changed, 58 insertions(+)
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index cbcaa70..73e23e2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -403,6 +403,14 @@ public class TaskDriver {
}
}
+ // Fail the operation if adding new jobs will cause the queue to reach its
capacity limit
+ workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
+ if (workflowConfig.getJobDag().size() + jobs.size() >= capacity) {
+ throw new IllegalStateException(
+ String.format("Queue %s already reaches its max capacity %d, failed
to add %s", queue,
+ capacity, jobs.toString()));
+ }
+
validateZKNodeLimitation(1);
final List<JobConfig> jobConfigs = new ArrayList<>();
final List<String> namespacedJobNames = new ArrayList<>();
@@ -442,6 +450,13 @@ public class TaskDriver {
.fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
Set<String> allNodes = jobDag.getAllNodes();
if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) {
+ // Remove previously added jobConfigs if adding new jobs will cause
exceeding capacity
+ // limit. Removing the job configs is necessary to avoid multiple
threads adding jobs at the
+ // same time and cause overcapacity queue
+ for (String job : jobs) {
+ String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
+ TaskUtil.removeJobConfig(_accessor, namespacedJobName);
+ }
throw new IllegalStateException(
String.format("Queue %s already reaches its max capacity %d,
failed to add %s", queue,
capacity, jobs.toString()));
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
index 4d88a5a..d05489b 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
@@ -159,4 +159,47 @@ public class TestEnqueueJobs extends TaskTestBase {
}
Assert.assertTrue(minFinishTime > maxStartTime);
}
+
+ @Test
+ public void testQueueJobsMaxCapacity() throws InterruptedException {
+ final int numberOfJobsAddedInitially = 4;
+ final int queueCapacity = 5;
+ final String newJobName = "NewJob";
+ String queueName = TestHelper.getTestMethodName();
+ JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
+ WorkflowConfig.Builder workflowCfgBuilder =
+ new
WorkflowConfig.Builder().setWorkflowId(queueName).setParallelJobs(1)
+ .setAllowOverlapJobAssignment(true).setCapacity(queueCapacity);
+
_driver.start(builder.setWorkflowConfig(workflowCfgBuilder.build()).build());
+ JobConfig.Builder jobBuilder =
+ new
JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+ .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+
.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "1000"));
+
+ // Add 4 jobs to the queue
+ for (int i = 0; i < numberOfJobsAddedInitially; i++) {
+ _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+ }
+
+ // Wait until all of the enqueued jobs (Job0 to Job3) are finished
+ for (int i = 0; i < numberOfJobsAddedInitially; i++) {
+ _driver.pollForJobState(queueName,
TaskUtil.getNamespacedJobName(queueName, "JOB" + i),
+ TaskState.COMPLETED);
+ }
+
+ boolean exceptionHappenedWhileAddingNewJob = false;
+ try {
+ // This call will produce the exception because 4 jobs have been already
added
+ // By adding the new job the queue will hit its capacity limit
+ _driver.enqueueJob(queueName, newJobName, jobBuilder);
+ } catch (Exception e) {
+ exceptionHappenedWhileAddingNewJob = true;
+ }
+ Assert.assertTrue(exceptionHappenedWhileAddingNewJob);
+
+ // Make sure that jobConfig has not been created
+ JobConfig jobConfig =
+ _driver.getJobConfig(TaskUtil.getNamespacedJobName(queueName,
newJobName));
+ Assert.assertNull(jobConfig);
+ }
}