Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 516faa0c5 -> e94a9f5f9


[HELIX-589] Delete job API throws NPE if the job does not exist in last 
scheduled workflow


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e94a9f5f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e94a9f5f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e94a9f5f

Branch: refs/heads/helix-0.6.x
Commit: e94a9f5f90099a248181d6dc50314aec0e8d9512
Parents: 516faa0
Author: Congrui Ji <[email protected]>
Authored: Thu Apr 2 15:40:09 2015 -0700
Committer: Congrui Ji <[email protected]>
Committed: Thu Apr 2 15:40:09 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  | 50 ++++++-------
 .../task/TestTaskRebalancerStopResume.java      | 79 ++++++++++++++++----
 2 files changed, 90 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e94a9f5f/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index f0670cf..cb079cc 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -369,37 +369,35 @@ public class TaskDriver {
         JobDag jobDag = 
JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG));
         Set<String> allNodes = jobDag.getAllNodes();
         if (!allNodes.contains(namespacedJobName)) {
-          throw new IllegalStateException("Could not delete job from queue " + 
queueName + ", job "
-              + jobName + " not exists");
-        }
-
-        String parent = null;
-        String child = null;
-        // remove the node from the queue
-        for (String node : allNodes) {
-          if (!node.equals(namespacedJobName)) {
-            if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
-              parent = node;
-              jobDag.removeParentToChild(parent, namespacedJobName);
-            } else if 
(jobDag.getDirectParents(node).contains(namespacedJobName)) {
-              child = node;
-              jobDag.removeParentToChild(namespacedJobName, child);
+          LOG.warn("Could not delete job from queue " + queueName + ", job " + 
jobName + " not exists");
+        } else {
+          String parent = null;
+          String child = null;
+          // remove the node from the queue
+          for (String node : allNodes) {
+            if (!node.equals(namespacedJobName)) {
+              if (jobDag.getDirectChildren(node).contains(namespacedJobName)) {
+                parent = node;
+                jobDag.removeParentToChild(parent, namespacedJobName);
+              } else if 
(jobDag.getDirectParents(node).contains(namespacedJobName)) {
+                child = node;
+                jobDag.removeParentToChild(namespacedJobName, child);
+              }
             }
           }
-        }
 
-        if (parent != null && child != null) {
-          jobDag.addParentToChild(parent, child);
-        }
+          if (parent != null && child != null) {
+            jobDag.addParentToChild(parent, child);
+          }
 
-        jobDag.removeNode(namespacedJobName);
+          jobDag.removeNode(namespacedJobName);
 
-        // Save the updated DAG
-        try {
-          currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
-        } catch (Exception e) {
-          throw new IllegalStateException(
-              "Could not remove job " + jobName + " from DAG of queue " + 
queueName, e);
+          // Save the updated DAG
+          try {
+            currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson());
+          } catch (Exception e) {
+            throw new IllegalStateException("Could not remove job " + jobName 
+ " from DAG of queue " + queueName, e);
+          }
         }
         return currentData;
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/e94a9f5f/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index aed4088..8a44672 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -220,8 +220,8 @@ public class TestTaskRebalancerStopResume extends 
ZkIntegrationTestBase {
 
     Set<String> slave = Sets.newHashSet("SLAVE");
     JobConfig.Builder job2 =
-    new JobConfig.Builder().setCommand("Reindex")
-        
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
+        new JobConfig.Builder().setCommand("Reindex")
+            
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
     String job2Name = "slaveJob";
     LOG.info("Enqueuing job: " + job2Name);
     _driver.enqueueJob(queueName, job2Name, job2);
@@ -315,9 +315,9 @@ public class TestTaskRebalancerStopResume extends 
ZkIntegrationTestBase {
     LOG.info("Pausing job-queue: " + queueName);
     _driver.stop(queueName);
     TestUtil.pollForJobState(_manager,
-                             queueName,
-                             String.format("%s_%s", queueName, 
currentJobNames.get(1)),
-                             TaskState.STOPPED);
+        queueName,
+        String.format("%s_%s", queueName, currentJobNames.get(1)),
+        TaskState.STOPPED);
     TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED);
 
     // Ensure job 3 is not started before deleting it
@@ -373,7 +373,8 @@ public class TestTaskRebalancerStopResume extends 
ZkIntegrationTestBase {
   {
     Map<String, String> cfgMap = new HashMap<String, String>();
     cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(50000));
-    cfgMap.put(WorkflowConfig.START_TIME, 
WorkflowConfig.getDefaultDateFormat().format(Calendar.getInstance().getTime()));
+    cfgMap.put(WorkflowConfig.START_TIME,
+        
WorkflowConfig.getDefaultDateFormat().format(Calendar.getInstance().getTime()));
     cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60));
     cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS");
     return (new JobQueue.Builder(jobQueueName).fromMap(cfgMap)).build();
@@ -468,6 +469,58 @@ public class TestTaskRebalancerStopResume extends 
ZkIntegrationTestBase {
   }
 
   @Test
+  public void deleteJobFromRecurrentQueueNotStarted() throws Exception {
+    String queueName = TestHelper.getTestMethodName();
+
+    // Create a queue
+    LOG.info("Starting job-queue: " + queueName);
+    JobQueue queue = buildRecurrentJobQueue(queueName);
+    _driver.createQueue(queue);
+
+    // create jobs
+    List<JobConfig.Builder> jobs = new ArrayList<JobConfig.Builder>();
+    List<String> jobNames = new ArrayList<String>();
+    Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, 
String.valueOf(500));
+
+    final int JOB_COUNTS = 3;
+
+    for (int i = 0; i < JOB_COUNTS; i++) {
+      String targetPartition = (i == 0) ? "MASTER" : "SLAVE";
+
+      JobConfig.Builder job =
+          new 
JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig)
+              .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+              .setTargetPartitionStates(Sets.newHashSet(targetPartition));
+      jobs.add(job);
+      jobNames.add(targetPartition.toLowerCase() + "Job" + i);
+    }
+
+    // enqueue all jobs except last one
+    for (int i = 0; i < JOB_COUNTS - 1; ++i) {
+      LOG.info("Enqueuing job: " + jobNames.get(i));
+      _driver.enqueueJob(queueName, jobNames.get(i), jobs.get(i));
+    }
+    String currentLastJob = jobNames.get(JOB_COUNTS - 2);
+
+    WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, 
queueName);
+    String scheduledQueue = wCtx.getLastScheduledSingleWorkflow();
+
+    // ensure all jobs are finished
+    String namedSpaceJob = String.format("%s_%s", scheduledQueue, 
currentLastJob);
+    TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, 
TaskState.COMPLETED);
+
+    // enqueue the last job
+    LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1));
+    _driver.enqueueJob(queueName, jobNames.get(JOB_COUNTS - 1), 
jobs.get(JOB_COUNTS - 1));
+
+    // remove the last job
+    _driver.deleteJob(queueName, jobNames.get(JOB_COUNTS - 1));
+
+    // verify
+    verifyJobDeleted(queueName, String.format("%s_%s", scheduledQueue, 
jobNames.get(JOB_COUNTS - 1)));
+  }
+
+  @Test
   public void stopAndDeleteQueue() throws Exception {
     final String queueName = TestHelper.getTestMethodName();
 
@@ -475,7 +528,7 @@ public class TestTaskRebalancerStopResume extends 
ZkIntegrationTestBase {
     System.out.println("START " + queueName + " at " + new 
Date(System.currentTimeMillis()));
     WorkflowConfig wfCfg
         = new WorkflowConfig.Builder().setExpiry(2, TimeUnit.MINUTES)
-                                      
.setScheduleConfig(ScheduleConfig.recurringFromNow(TimeUnit.MINUTES, 
1)).build();
+        .setScheduleConfig(ScheduleConfig.recurringFromNow(TimeUnit.MINUTES, 
1)).build();
     JobQueue qCfg = new 
JobQueue.Builder(queueName).fromMap(wfCfg.getResourceConfigMap()).build();
     _driver.createQueue(qCfg);
 
@@ -483,7 +536,7 @@ public class TestTaskRebalancerStopResume extends 
ZkIntegrationTestBase {
     Set<String> master = Sets.newHashSet("MASTER");
     JobConfig.Builder job1 =
         new JobConfig.Builder().setCommand("Reindex")
-                               
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
+            
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master);
     String job1Name = "masterJob";
     LOG.info("Enqueuing job1: " + job1Name);
     _driver.enqueueJob(queueName, job1Name, job1);
@@ -491,7 +544,7 @@ public class TestTaskRebalancerStopResume extends 
ZkIntegrationTestBase {
     Set<String> slave = Sets.newHashSet("SLAVE");
     JobConfig.Builder job2 =
         new JobConfig.Builder().setCommand("Reindex")
-                               
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
+            
.setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave);
     String job2Name = "slaveJob";
     LOG.info("Enqueuing job2: " + job2Name);
     _driver.enqueueJob(queueName, job2Name, job2);
@@ -522,10 +575,10 @@ public class TestTaskRebalancerStopResume extends 
ZkIntegrationTestBase {
         // check paths for resource-config, ideal-state, external-view, 
property-store
         List<String> paths
             = Lists.newArrayList(keyBuilder.resourceConfigs().getPath(),
-                                 keyBuilder.idealStates().getPath(),
-                                 keyBuilder.externalViews().getPath(),
-                                 
PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, CLUSTER_NAME)
-                                     + TaskConstants.REBALANCER_CONTEXT_ROOT);
+            keyBuilder.idealStates().getPath(),
+            keyBuilder.externalViews().getPath(),
+            PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, 
CLUSTER_NAME)
+                + TaskConstants.REBALANCER_CONTEXT_ROOT);
 
         for (String path : paths) {
           List<String> childNames = 
accessor.getBaseDataAccessor().getChildNames(path, 0);

Reply via email to