Repository: helix Updated Branches: refs/heads/helix-0.6.x 516faa0c5 -> e94a9f5f9
[HELIX-589] Delete job API throws NPE if the job does not exist in last scheduled workflow Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e94a9f5f Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e94a9f5f Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e94a9f5f Branch: refs/heads/helix-0.6.x Commit: e94a9f5f90099a248181d6dc50314aec0e8d9512 Parents: 516faa0 Author: Congrui Ji <[email protected]> Authored: Thu Apr 2 15:40:09 2015 -0700 Committer: Congrui Ji <[email protected]> Committed: Thu Apr 2 15:40:09 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskDriver.java | 50 ++++++------- .../task/TestTaskRebalancerStopResume.java | 79 ++++++++++++++++---- 2 files changed, 90 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/e94a9f5f/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index f0670cf..cb079cc 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -369,37 +369,35 @@ public class TaskDriver { JobDag jobDag = JobDag.fromJson(currentData.getSimpleField(WorkflowConfig.DAG)); Set<String> allNodes = jobDag.getAllNodes(); if (!allNodes.contains(namespacedJobName)) { - throw new IllegalStateException("Could not delete job from queue " + queueName + ", job " - + jobName + " not exists"); - } - - String parent = null; - String child = null; - // remove the node from the queue - for (String node : allNodes) { - if (!node.equals(namespacedJobName)) { - if (jobDag.getDirectChildren(node).contains(namespacedJobName)) { - parent = node; - jobDag.removeParentToChild(parent, namespacedJobName); - } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) { - child = node; - jobDag.removeParentToChild(namespacedJobName, child); + LOG.warn("Could not delete job from queue " + queueName + ", job " + jobName + " not exists"); + } else { + String parent = null; + String child = null; + // remove the node from the queue + for (String node : allNodes) { + if (!node.equals(namespacedJobName)) { + if (jobDag.getDirectChildren(node).contains(namespacedJobName)) { + parent = node; + jobDag.removeParentToChild(parent, namespacedJobName); + } else if (jobDag.getDirectParents(node).contains(namespacedJobName)) { + child = node; + jobDag.removeParentToChild(namespacedJobName, child); + } } } - } - if (parent != null && child != null) { - jobDag.addParentToChild(parent, child); - } + if (parent != null && child != null) { + jobDag.addParentToChild(parent, child); + } - jobDag.removeNode(namespacedJobName); + jobDag.removeNode(namespacedJobName); - // Save the updated DAG - try { - currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson()); - } catch (Exception e) { - throw new IllegalStateException( - "Could not remove job " + jobName + " from DAG of queue " + queueName, e); + // Save the updated DAG + try { + currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson()); + } catch (Exception e) { + throw new IllegalStateException("Could not remove job " + jobName + " from DAG of queue " + queueName, e); + } } return currentData; } http://git-wip-us.apache.org/repos/asf/helix/blob/e94a9f5f/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java index aed4088..8a44672 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java @@ -220,8 +220,8 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { Set<String> slave = Sets.newHashSet("SLAVE"); JobConfig.Builder job2 = - new JobConfig.Builder().setCommand("Reindex") - .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave); + new JobConfig.Builder().setCommand("Reindex") + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave); String job2Name = "slaveJob"; LOG.info("Enqueuing job: " + job2Name); _driver.enqueueJob(queueName, job2Name, job2); @@ -315,9 +315,9 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { LOG.info("Pausing job-queue: " + queueName); _driver.stop(queueName); TestUtil.pollForJobState(_manager, - queueName, - String.format("%s_%s", queueName, currentJobNames.get(1)), - TaskState.STOPPED); + queueName, + String.format("%s_%s", queueName, currentJobNames.get(1)), + TaskState.STOPPED); TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED); // Ensure job 3 is not started before deleting it @@ -373,7 +373,8 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { { Map<String, String> cfgMap = new HashMap<String, String>(); cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(50000)); - cfgMap.put(WorkflowConfig.START_TIME, WorkflowConfig.getDefaultDateFormat().format(Calendar.getInstance().getTime())); + cfgMap.put(WorkflowConfig.START_TIME, + WorkflowConfig.getDefaultDateFormat().format(Calendar.getInstance().getTime())); cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(60)); cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS"); return (new JobQueue.Builder(jobQueueName).fromMap(cfgMap)).build(); @@ -468,6 +469,58 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { } @Test + public void deleteJobFromRecurrentQueueNotStarted() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue queue = buildRecurrentJobQueue(queueName); + _driver.createQueue(queue); + + // create jobs + List<JobConfig.Builder> jobs = new ArrayList<JobConfig.Builder>(); + List<String> jobNames = new ArrayList<String>(); + Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500)); + + final int JOB_COUNTS = 3; + + for (int i = 0; i < JOB_COUNTS; i++) { + String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; + + JobConfig.Builder job = + new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig) + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet(targetPartition)); + jobs.add(job); + jobNames.add(targetPartition.toLowerCase() + "Job" + i); + } + + // enqueue all jobs except last one + for (int i = 0; i < JOB_COUNTS - 1; ++i) { + LOG.info("Enqueuing job: " + jobNames.get(i)); + _driver.enqueueJob(queueName, jobNames.get(i), jobs.get(i)); + } + String currentLastJob = jobNames.get(JOB_COUNTS - 2); + + WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName); + String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); + + // ensure all jobs are finished + String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob); + TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED); + + // enqueue the last job + LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1)); + _driver.enqueueJob(queueName, jobNames.get(JOB_COUNTS - 1), jobs.get(JOB_COUNTS - 1)); + + // remove the last job + _driver.deleteJob(queueName, jobNames.get(JOB_COUNTS - 1)); + + // verify + verifyJobDeleted(queueName, String.format("%s_%s", scheduledQueue, jobNames.get(JOB_COUNTS - 1))); + } + + @Test public void stopAndDeleteQueue() throws Exception { final String queueName = TestHelper.getTestMethodName(); @@ -475,7 +528,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { System.out.println("START " + queueName + " at " + new Date(System.currentTimeMillis())); WorkflowConfig wfCfg = new WorkflowConfig.Builder().setExpiry(2, TimeUnit.MINUTES) - .setScheduleConfig(ScheduleConfig.recurringFromNow(TimeUnit.MINUTES, 1)).build(); + .setScheduleConfig(ScheduleConfig.recurringFromNow(TimeUnit.MINUTES, 1)).build(); JobQueue qCfg = new JobQueue.Builder(queueName).fromMap(wfCfg.getResourceConfigMap()).build(); _driver.createQueue(qCfg); @@ -483,7 +536,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { Set<String> master = Sets.newHashSet("MASTER"); JobConfig.Builder job1 = new JobConfig.Builder().setCommand("Reindex") - .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master); + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master); String job1Name = "masterJob"; LOG.info("Enqueuing job1: " + job1Name); _driver.enqueueJob(queueName, job1Name, job1); @@ -491,7 +544,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { Set<String> slave = Sets.newHashSet("SLAVE"); JobConfig.Builder job2 = new JobConfig.Builder().setCommand("Reindex") - .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave); + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave); String job2Name = "slaveJob"; LOG.info("Enqueuing job2: " + job2Name); _driver.enqueueJob(queueName, job2Name, job2); @@ -522,10 +575,10 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { // check paths for resource-config, ideal-state, external-view, property-store List<String> paths = Lists.newArrayList(keyBuilder.resourceConfigs().getPath(), - keyBuilder.idealStates().getPath(), - keyBuilder.externalViews().getPath(), - PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, CLUSTER_NAME) - + TaskConstants.REBALANCER_CONTEXT_ROOT); + keyBuilder.idealStates().getPath(), + keyBuilder.externalViews().getPath(), + PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, CLUSTER_NAME) + + TaskConstants.REBALANCER_CONTEXT_ROOT); for (String path : paths) { List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0);
