Repository: helix Updated Branches: refs/heads/helix-0.6.x 49ceac0e9 -> a80ba8ebd
[HELIX-583] support deleting recurring job queue Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a80ba8eb Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a80ba8eb Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a80ba8eb Branch: refs/heads/helix-0.6.x Commit: a80ba8ebd9d7b2a7aded4339867c9f53a36fdfe8 Parents: 49ceac0 Author: zzhang <[email protected]> Authored: Fri Mar 20 01:52:41 2015 -0700 Committer: zzhang <[email protected]> Committed: Fri Mar 20 01:52:41 2015 -0700 ---------------------------------------------------------------------- .../org/apache/helix/task/TaskRebalancer.java | 36 ++++---- .../java/org/apache/helix/task/TaskUtil.java | 6 +- .../org/apache/helix/task/WorkflowConfig.java | 10 +++ .../task/TestTaskRebalancerStopResume.java | 87 +++++++++++++++++++- .../apache/helix/integration/task/TestUtil.java | 24 +++++- 5 files changed, 140 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/a80ba8eb/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index 1c7a7a3..fe3f496 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -559,6 +559,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { if (SCHEDULED_TIMES.containsKey(id) || SCHEDULED_TIMES.inverse().containsKey(startTime)) { return; } + LOG.info("Schedule rebalance with id: " + id + "and job: " + jobResource); // For workflows not yet scheduled, schedule them and record it RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource); @@ -664,6 +665,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { */ private static void cleanup(HelixManager mgr, final String resourceName, WorkflowConfig cfg, String workflowResource) { + LOG.info("Cleaning up job: " + resourceName + " in workflow: " + workflowResource); HelixDataAccessor accessor = mgr.getHelixDataAccessor(); // Remove any DAG references in workflow @@ -684,7 +686,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { try { currentData.setSimpleField(WorkflowConfig.DAG, jobDag.toJson()); } catch (Exception e) { - LOG.equals("Could not update DAG for job " + resourceName); + LOG.equals("Could not update DAG for job: " + resourceName); } return currentData; } @@ -695,28 +697,31 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // Delete resource configs. PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName); if (!accessor.removeProperty(cfgKey)) { - throw new RuntimeException( - String - .format( - "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix. Aborting further clean up steps.", - resourceName, cfgKey)); + throw new RuntimeException(String.format( + "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix. Aborting further clean up steps.", + resourceName, + cfgKey)); } + // Delete property store information for this resource. + // For recurring workflow, it's OK if the node doesn't exist. String propStoreKey = getRebalancerPropStoreKey(resourceName); - if (!mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT)) { - throw new RuntimeException( - String - .format( - "Error occurred while trying to clean up job %s. Failed to remove node %s from Helix. Aborting further clean up steps.", - resourceName, propStoreKey)); - } - // Finally, delete the ideal state itself. + mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT); + + // Delete the ideal state itself. PropertyKey isKey = getISPropertyKey(accessor, resourceName); if (!accessor.removeProperty(isKey)) { throw new RuntimeException(String.format( "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.", resourceName, isKey)); } + + // Delete dead external view + // because job is already completed, there is no more current state change + // thus dead external views removal will not be triggered + PropertyKey evKey = accessor.keyBuilder().externalView(resourceName); + accessor.removeProperty(evKey); + LOG.info(String.format("Successfully cleaned up job resource %s.", resourceName)); boolean lastInWorkflow = true; @@ -727,11 +732,12 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { || accessor.getProperty(getConfigPropertyKey(accessor, job)) != null || accessor.getProperty(getISPropertyKey(accessor, job)) != null) { lastInWorkflow = false; + break; } } // clean up workflow-level info if this was the last in workflow - if (lastInWorkflow && cfg.isTerminable()) { + if (lastInWorkflow && (cfg.isTerminable() || cfg.getTargetState() == TargetState.DELETE)) { // delete workflow config PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource); if (!accessor.removeProperty(workflowCfgKey)) { http://git-wip-us.apache.org/repos/asf/helix/blob/a80ba8eb/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index 4f6afe0..a37dd6f 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -345,7 +345,11 @@ public class TaskUtil { public static void invokeRebalance(HelixManager manager, String resource) { // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run HelixDataAccessor accessor = manager.getHelixDataAccessor(); - accessor.updateProperty(accessor.keyBuilder().idealStates(resource), new IdealState(resource)); + PropertyKey key = accessor.keyBuilder().idealStates(resource); + IdealState is = accessor.getProperty(key); + if (is != null) { + accessor.updateProperty(key, is); + } } /** http://git-wip-us.apache.org/repos/asf/helix/blob/a80ba8eb/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java index 6bc5181..4129fea 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java @@ -24,6 +24,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.TimeZone; +import java.util.concurrent.TimeUnit; /** * Provides a typed interface to workflow level configurations. Validates the configurations. @@ -82,6 +83,10 @@ public class WorkflowConfig { return _scheduleConfig; } + public boolean isRecurring() { + return _scheduleConfig != null && _scheduleConfig.isRecurring(); + } + public Map<String, String> getResourceConfigMap() throws Exception { Map<String, String> cfgMap = new HashMap<String, String>(); cfgMap.put(WorkflowConfig.DAG, getJobDag().toJson()); @@ -124,6 +129,11 @@ public class WorkflowConfig { return this; } + public Builder setExpiry(long v, TimeUnit unit) { + _expiry = unit.toMillis(v); + return this; + } + public Builder setExpiry(long v) { _expiry = v; return this; http://git-wip-us.apache.org/repos/asf/helix/blob/a80ba8eb/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java index 9f72363..fd709d8 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java @@ -29,22 +29,28 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Lists; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyPathConfig; +import org.apache.helix.PropertyType; import org.apache.helix.TestHelper; import org.apache.helix.integration.ZkIntegrationTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobContext; import org.apache.helix.task.JobDag; import org.apache.helix.task.JobQueue; +import org.apache.helix.task.ScheduleConfig; import org.apache.helix.task.Task; import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskConstants; import org.apache.helix.task.TaskDriver; import org.apache.helix.task.TaskFactory; import org.apache.helix.task.TaskResult; @@ -56,6 +62,7 @@ import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.util.PathUtils; import org.apache.log4j.Logger; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -259,7 +266,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { @Test - public void stopDeleteAndResumeNamedQueue() throws Exception { + public void stopDeleteJobAndResumeNamedQueue() throws Exception { String queueName = TestHelper.getTestMethodName(); // Create a queue @@ -307,8 +314,10 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { // stop the queue LOG.info("Pausing job-queue: " + queueName); _driver.stop(queueName); - TestUtil.pollForJobState(_manager, queueName, - String.format("%s_%s", queueName, currentJobNames.get(1)), TaskState.STOPPED); + TestUtil.pollForJobState(_manager, + queueName, + String.format("%s_%s", queueName, currentJobNames.get(1)), + TaskState.STOPPED); TestUtil.pollForWorkflowState(_manager, queueName, TaskState.STOPPED); // Ensure job 3 is not started before deleting it @@ -372,7 +381,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { } @Test - public void stopDeleteAndResumeRecurrentNamedQueue() throws Exception { + public void stopDeleteJobAndResumeRecurrentQueue() throws Exception { String queueName = TestHelper.getTestMethodName(); // Create a queue @@ -459,6 +468,76 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { // verify the job is not there for the next recurrence of queue schedule } + @Test + public void stopAndDeleteQueue() throws Exception { + final String queueName = TestHelper.getTestMethodName(); + + // Create a queue + System.out.println("START " + queueName + " at " + new Date(System.currentTimeMillis())); + WorkflowConfig wfCfg + = new WorkflowConfig.Builder().setExpiry(2, TimeUnit.MINUTES) + .setScheduleConfig(ScheduleConfig.recurringFromNow(TimeUnit.MINUTES, 1)).build(); + JobQueue qCfg = new JobQueue.Builder(queueName).fromMap(wfCfg.getResourceConfigMap()).build(); + _driver.createQueue(qCfg); + + // Enqueue 2 jobs + Set<String> master = Sets.newHashSet("MASTER"); + JobConfig.Builder job1 = + new JobConfig.Builder().setCommand("Reindex") + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(master); + String job1Name = "masterJob"; + LOG.info("Enqueuing job1: " + job1Name); + _driver.enqueueJob(queueName, job1Name, job1); + + Set<String> slave = Sets.newHashSet("SLAVE"); + JobConfig.Builder job2 = + new JobConfig.Builder().setCommand("Reindex") + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB).setTargetPartitionStates(slave); + String job2Name = "slaveJob"; + LOG.info("Enqueuing job2: " + job2Name); + _driver.enqueueJob(queueName, job2Name, job2); + + String namespacedJob1 = String.format("%s_%s", queueName, job1Name); + TestUtil.pollForJobState(_manager, queueName, namespacedJob1, TaskState.COMPLETED); + + String namespacedJob2 = String.format("%s_%s", queueName, job2Name); + TestUtil.pollForJobState(_manager, queueName, namespacedJob2, TaskState.COMPLETED); + + // Stop and delete queue + _driver.stop(queueName); + _driver.delete(queueName); + + // Wait until all status are cleaned up + boolean result = TestHelper.verify(new TestHelper.Verifier() { + @Override public boolean verify() throws Exception { + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + // check paths for resource-config, ideal-state, external-view, property-store + List<String> paths + = Lists.newArrayList(keyBuilder.resourceConfigs().getPath(), + keyBuilder.idealStates().getPath(), + keyBuilder.externalViews().getPath(), + PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, CLUSTER_NAME) + + TaskConstants.REBALANCER_CONTEXT_ROOT); + + for (String path : paths) { + List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0); + for (String childName : childNames) { + if (childName.startsWith(queueName)) { + return false; + } + } + } + + return true; + } + }, 30 * 1000); + Assert.assertTrue(result); + + System.out.println("END " + queueName + " at " + new Date(System.currentTimeMillis())); + } + private void verifyJobDeleted(String queueName, String jobName) throws Exception { HelixDataAccessor accessor = _manager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); http://git-wip-us.apache.org/repos/asf/helix/blob/a80ba8eb/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java index 27e827a..43c5783 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java @@ -23,6 +23,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.TestHelper; import org.apache.helix.task.TaskState; import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.WorkflowConfig; import org.apache.helix.task.WorkflowContext; import org.testng.Assert; @@ -56,13 +57,30 @@ public class TestUtil { public static void pollForJobState(HelixManager manager, String workflowResource, String jobName, TaskState state) throws InterruptedException { - // Wait for completion. - long st = System.currentTimeMillis(); + // Get workflow config + WorkflowConfig wfCfg = TaskUtil.getWorkflowCfg(manager, workflowResource); + Assert.assertNotNull(wfCfg); WorkflowContext ctx; + if (wfCfg.isRecurring()) { + // if it's recurring, need to reconstruct workflow and job name + do { + Thread.sleep(100); + ctx = TaskUtil.getWorkflowContext(manager, workflowResource); + } while ((ctx == null || ctx.getLastScheduledSingleWorkflow() == null)); + Assert.assertNotNull(ctx); + Assert.assertNotNull(ctx.getLastScheduledSingleWorkflow()); + jobName = jobName.substring(workflowResource.length() + 1); + workflowResource = ctx.getLastScheduledSingleWorkflow(); + jobName = String.format("%s_%s", workflowResource, jobName); + } + + // Wait for state + long st = System.currentTimeMillis(); do { Thread.sleep(100); ctx = TaskUtil.getWorkflowContext(manager, workflowResource); - } while ((ctx == null || ctx.getJobState(jobName) == null || ctx.getJobState(jobName) != state) + } + while ((ctx == null || ctx.getJobState(jobName) == null || ctx.getJobState(jobName) != state) && System.currentTimeMillis() < st + _default_timeout); Assert.assertNotNull(ctx); Assert.assertEquals(ctx.getJobState(jobName), state);
