This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 00eb678  Avoid adding JobConfig if queue has reached its capacity 
limit (#1064)
00eb678 is described below

commit 00eb6786d3430fe31f313874dea96b78309c4f51
Author: Ali Reza Zamani Zadeh Najari <[email protected]>
AuthorDate: Fri Jun 5 15:27:58 2020 -0700

    Avoid adding JobConfig if queue has reached its capacity limit (#1064)
    
    Avoid adding JobConfig if queue has reached its capacity limit
    
    In this commit, the necessary check has been added in TaskDriver side
    to fail creation of JobConfig if the queue has reached its capacity limit.
---
 .../java/org/apache/helix/task/TaskDriver.java     | 15 ++++++++
 .../helix/integration/task/TestEnqueueJobs.java    | 43 ++++++++++++++++++++++
 2 files changed, 58 insertions(+)

diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index cbcaa70..73e23e2 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -403,6 +403,14 @@ public class TaskDriver {
       }
     }
 
+    // Fail the operation if adding new jobs will cause the queue to reach its 
capacity limit
+    workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
+    if (workflowConfig.getJobDag().size() + jobs.size() >= capacity) {
+      throw new IllegalStateException(
+          String.format("Queue %s already reaches its max capacity %d, failed 
to add %s", queue,
+              capacity, jobs.toString()));
+    }
+
     validateZKNodeLimitation(1);
     final List<JobConfig> jobConfigs = new ArrayList<>();
     final List<String> namespacedJobNames = new ArrayList<>();
@@ -442,6 +450,13 @@ public class TaskDriver {
           
.fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
       Set<String> allNodes = jobDag.getAllNodes();
       if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) {
+        // Remove previously added jobConfigs if adding new jobs will cause 
exceeding capacity
+        // limit. Removing the job configs is necessary to avoid multiple 
threads adding jobs at the
+        // same time and cause overcapacity queue
+        for (String job : jobs) {
+          String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
+          TaskUtil.removeJobConfig(_accessor, namespacedJobName);
+        }
         throw new IllegalStateException(
             String.format("Queue %s already reaches its max capacity %d, 
failed to add %s", queue,
                 capacity, jobs.toString()));
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
index 4d88a5a..d05489b 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
@@ -159,4 +159,47 @@ public class TestEnqueueJobs extends TaskTestBase {
     }
     Assert.assertTrue(minFinishTime > maxStartTime);
   }
+
+  @Test
+  public void testQueueJobsMaxCapacity() throws InterruptedException {
+    final int numberOfJobsAddedInitially = 4;
+    final int queueCapacity = 5;
+    final String newJobName = "NewJob";
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
+    WorkflowConfig.Builder workflowCfgBuilder =
+        new 
WorkflowConfig.Builder().setWorkflowId(queueName).setParallelJobs(1)
+            .setAllowOverlapJobAssignment(true).setCapacity(queueCapacity);
+    
_driver.start(builder.setWorkflowConfig(workflowCfgBuilder.build()).build());
+    JobConfig.Builder jobBuilder =
+        new 
JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            
.setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "1000"));
+
+    // Add 4 jobs to the queue
+    for (int i = 0; i < numberOfJobsAddedInitially; i++) {
+      _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+    }
+
+    // Wait until all of the enqueued jobs (Job0 to Job3) are finished
+    for (int i = 0; i < numberOfJobsAddedInitially; i++) {
+      _driver.pollForJobState(queueName, 
TaskUtil.getNamespacedJobName(queueName, "JOB" + i),
+          TaskState.COMPLETED);
+    }
+
+    boolean exceptionHappenedWhileAddingNewJob = false;
+    try {
+      // This call will produce the exception because 4 jobs have been already 
added
+      // By adding the new job the queue will hit its capacity limit
+      _driver.enqueueJob(queueName, newJobName, jobBuilder);
+    } catch (Exception e) {
+      exceptionHappenedWhileAddingNewJob = true;
+    }
+    Assert.assertTrue(exceptionHappenedWhileAddingNewJob);
+
+    // Make sure that jobConfig has not been created
+    JobConfig jobConfig =
+        _driver.getJobConfig(TaskUtil.getNamespacedJobName(queueName, 
newJobName));
+    Assert.assertNull(jobConfig);
+  }
 }

Reply via email to