Repository: helix Updated Branches: refs/heads/master 3da3e319a -> f8ee313ee
Record workflow scheduling history in recurrent workflows. Add records of scheduling history. When deleting a recurrent workflow, also remove all scheduled workflows that are finished. Also add test case for deleting recurrent workflows with scheduling history. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f8ee313e Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f8ee313e Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f8ee313e Branch: refs/heads/master Commit: f8ee313ee5f6e0eb6fcbc584773f9ec3c1b01c6c Parents: 3da3e31 Author: Jiajun Wang <[email protected]> Authored: Fri Jul 28 17:14:39 2017 -0700 Committer: Jiajun Wang <[email protected]> Committed: Wed Sep 20 11:24:47 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskDriver.java | 41 ++++-- .../org/apache/helix/task/WorkflowContext.java | 69 ++++++---- .../integration/task/TestRecurringJobQueue.java | 134 ++++++++++++------- 3 files changed, 154 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/f8ee313e/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index c922b18..d3dba5b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -329,13 +329,12 @@ public class TaskDriver { _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT); // Now atomically clear the results - path = - Joiner.on("/") - .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE); + path = Joiner.on("/") + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE); updater = new DataUpdater<ZNRecord>() { - @Override - public ZNRecord update(ZNRecord currentData) { - Map<String, String> states = currentData.getMapField(WorkflowContext.JOB_STATES); + @Override public ZNRecord update(ZNRecord currentData) { + Map<String, String> states = + currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name()); if (states != null) { states.keySet().removeAll(toRemove); } @@ -505,10 +504,10 @@ public class TaskDriver { .join(TaskConstants.REBALANCER_CONTEXT_ROOT, queueName, TaskUtil.CONTEXT_NODE); DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { - @Override - public ZNRecord update(ZNRecord currentData) { + @Override public ZNRecord update(ZNRecord currentData) { if (currentData != null) { - Map<String, String> states = currentData.getMapField(WorkflowContext.JOB_STATES); + Map<String, String> states = + currentData.getMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name()); if (states != null && states.containsKey(namespacedJobName)) { states.keySet().remove(namespacedJobName); } @@ -738,7 +737,21 @@ public class TaskDriver { * @param workflow */ public void delete(String workflow) { + // After set DELETE state, rebalancer may remove the workflow instantly. + // So record context before set DELETE state. + WorkflowContext wCtx = TaskUtil.getWorkflowContext(_propertyStore, workflow); + setWorkflowTargetState(workflow, TargetState.DELETE); + + // Delete all finished scheduled workflows. + if (wCtx != null && wCtx.getScheduledWorkflows() != null) { + for (String scheduledWorkflow : wCtx.getScheduledWorkflows()) { + WorkflowContext scheduledWorkflowCtx = TaskUtil.getWorkflowContext(_propertyStore, scheduledWorkflow); + if (scheduledWorkflowCtx != null && scheduledWorkflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) { + setWorkflowTargetState(scheduledWorkflow, TargetState.DELETE); + } + } + } } /** @@ -756,15 +769,17 @@ public class TaskDriver { } } - /** Helper function to change target state for a given workflow */ + /** + * Helper function to change target state for a given workflow + */ private void setSingleWorkflowTargetState(String workflowName, final TargetState state) { LOG.info("Set " + workflowName + " to target state " + state); DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { - @Override - public ZNRecord update(ZNRecord currentData) { + @Override public ZNRecord update(ZNRecord currentData) { if (currentData != null) { // Only update target state for non-completed workflows - String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME); + String finishTime = currentData + .getSimpleField(WorkflowContext.WorkflowContextProperties.FINISH_TIME.name()); if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) { currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.TargetState.name(), state.name()); http://git-wip-us.apache.org/repos/asf/helix/blob/f8ee313e/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java index 9c1f77a..cc21ce3 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java @@ -19,9 +19,7 @@ package org.apache.helix.task; * under the License. */ -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; +import java.util.*; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; @@ -31,11 +29,14 @@ import org.apache.helix.ZNRecord; * property store */ public class WorkflowContext extends HelixProperty { - public static final String WORKFLOW_STATE = "STATE"; - public static final String START_TIME = "START_TIME"; - public static final String FINISH_TIME = "FINISH_TIME"; - public static final String JOB_STATES = "JOB_STATES"; - public static final String LAST_SCHEDULED_WORKFLOW = "LAST_SCHEDULED_WORKFLOW"; + protected enum WorkflowContextProperties { + STATE, + START_TIME, + FINISH_TIME, + JOB_STATES, + LAST_SCHEDULED_WORKFLOW, + SCHEDULED_WORKFLOWS, + } public static final int UNSTARTED = -1; public static final int UNFINISHED = -1; @@ -44,16 +45,18 @@ public class WorkflowContext extends HelixProperty { } public void setWorkflowState(TaskState s) { - if (_record.getSimpleField(WORKFLOW_STATE) == null) { - _record.setSimpleField(WORKFLOW_STATE, s.name()); - } else if (!_record.getSimpleField(WORKFLOW_STATE).equals(TaskState.FAILED.name()) - && !_record.getSimpleField(WORKFLOW_STATE).equals(TaskState.COMPLETED.name())) { - _record.setSimpleField(WORKFLOW_STATE, s.name()); + if (_record.getSimpleField(WorkflowContextProperties.STATE.name()) == null) { + _record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name()); + } else if (!_record.getSimpleField(WorkflowContextProperties.STATE.name()) + .equals(TaskState.FAILED.name()) && !_record + .getSimpleField(WorkflowContextProperties.STATE.name()) + .equals(TaskState.COMPLETED.name())) { + _record.setSimpleField(WorkflowContextProperties.STATE.name(), s.name()); } } public TaskState getWorkflowState() { - String s = _record.getSimpleField(WORKFLOW_STATE); + String s = _record.getSimpleField(WorkflowContextProperties.STATE.name()); if (s == null) { return null; } @@ -62,16 +65,16 @@ public class WorkflowContext extends HelixProperty { } public void setJobState(String jobResource, TaskState s) { - Map<String, String> states = _record.getMapField(JOB_STATES); + Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name()); if (states == null) { - states = new TreeMap<String, String>(); - _record.setMapField(JOB_STATES, states); + states = new TreeMap<>(); + _record.setMapField(WorkflowContextProperties.JOB_STATES.name(), states); } states.put(jobResource, s.name()); } public TaskState getJobState(String jobResource) { - Map<String, String> states = _record.getMapField(JOB_STATES); + Map<String, String> states = _record.getMapField(WorkflowContextProperties.JOB_STATES.name()); if (states == null) { return null; } @@ -85,8 +88,8 @@ public class WorkflowContext extends HelixProperty { } public Map<String, TaskState> getJobStates() { - Map<String, TaskState> jobStates = new HashMap<String, TaskState>(); - Map<String, String> stateFieldMap = _record.getMapField(JOB_STATES); + Map<String, TaskState> jobStates = new HashMap<>(); + Map<String, String> stateFieldMap = _record.getMapField(WorkflowContextProperties.JOB_STATES.name()); if (stateFieldMap != null) { for (Map.Entry<String, String> state : stateFieldMap.entrySet()) { jobStates.put(state.getKey(), TaskState.valueOf(state.getValue())); @@ -97,11 +100,11 @@ public class WorkflowContext extends HelixProperty { } public void setStartTime(long t) { - _record.setSimpleField(START_TIME, String.valueOf(t)); + _record.setSimpleField(WorkflowContextProperties.START_TIME.name(), String.valueOf(t)); } public long getStartTime() { - String tStr = _record.getSimpleField(START_TIME); + String tStr = _record.getSimpleField(WorkflowContextProperties.START_TIME.name()); if (tStr == null) { return -1; } @@ -110,11 +113,11 @@ public class WorkflowContext extends HelixProperty { } public void setFinishTime(long t) { - _record.setSimpleField(FINISH_TIME, String.valueOf(t)); + _record.setSimpleField(WorkflowContextProperties.FINISH_TIME.name(), String.valueOf(t)); } public long getFinishTime() { - String tStr = _record.getSimpleField(FINISH_TIME); + String tStr = _record.getSimpleField(WorkflowContextProperties.FINISH_TIME.name()); if (tStr == null) { return UNFINISHED; } @@ -122,11 +125,23 @@ public class WorkflowContext extends HelixProperty { return Long.parseLong(tStr); } - public void setLastScheduledSingleWorkflow(String wf) { - _record.setSimpleField(LAST_SCHEDULED_WORKFLOW, wf); + public void setLastScheduledSingleWorkflow(String workflow) { + _record.setSimpleField(WorkflowContextProperties.LAST_SCHEDULED_WORKFLOW.name(), workflow); + // Record scheduled workflow into the history list as well + List<String> workflows = getScheduledWorkflows(); + if (workflows == null) { + workflows = new ArrayList<>(); + _record.setListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name(), workflows); + } + workflows.add(workflow); } public String getLastScheduledSingleWorkflow() { - return _record.getSimpleField(LAST_SCHEDULED_WORKFLOW); + return _record.getSimpleField(WorkflowContextProperties.LAST_SCHEDULED_WORKFLOW.name()); } + + public List<String> getScheduledWorkflows() { + return _record.getListField(WorkflowContextProperties.SCHEDULED_WORKFLOWS.name()); + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/f8ee313e/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java index 4983ed3..a1070d8 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java @@ -50,19 +50,7 @@ public class TestRecurringJobQueue extends TaskTestBase { // Create a queue LOG.info("Starting job-queue: " + queueName); JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName); - // Create and Enqueue jobs - List<String> currentJobNames = new ArrayList<String>(); - for (int i = 0; i <= 1; i++) { - String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; - - JobConfig.Builder jobConfig = - new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND) - .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) - .setTargetPartitionStates(Sets.newHashSet(targetPartition)); - String jobName = targetPartition.toLowerCase() + "Job" + i; - queueBuild.enqueueJob(jobName, jobConfig); - currentJobNames.add(jobName); - } + List<String> currentJobNames = createAndEnqueueJob(queueBuild, 2); _driver.start(queueBuild.build()); @@ -79,17 +67,7 @@ public class TestRecurringJobQueue extends TaskTestBase { JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5); currentJobNames.clear(); - for (int i = 0; i <= 1; i++) { - String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; - - JobConfig.Builder job = - new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND) - .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) - .setTargetPartitionStates(Sets.newHashSet(targetPartition)); - String jobName = targetPartition.toLowerCase() + "Job" + i; - queueBuilder.enqueueJob(jobName, job); - currentJobNames.add(jobName); - } + currentJobNames = createAndEnqueueJob(queueBuilder, 2); _driver.createQueue(queueBuilder.build()); @@ -115,20 +93,9 @@ public class TestRecurringJobQueue extends TaskTestBase { JobQueue.Builder queueBuilder = TaskTestUtil.buildRecurrentJobQueue(queueName, 5); // Create and Enqueue jobs - List<String> currentJobNames = new ArrayList<String>(); Map<String, String> commandConfig = ImmutableMap.of(MockTask.TIMEOUT_CONFIG, String.valueOf(500)); Thread.sleep(100); - for (int i = 0; i <= 4; i++) { - String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; - - JobConfig.Builder job = new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND) - .setJobCommandConfigMap(commandConfig).setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) - .setTargetPartitionStates(Sets.newHashSet(targetPartition)); - String jobName = targetPartition.toLowerCase() + "Job" + i; - LOG.info("Enqueuing job: " + jobName); - queueBuilder.enqueueJob(jobName, job); - currentJobNames.add(i, jobName); - } + List<String> currentJobNames = createAndEnqueueJob(queueBuilder, 5); _driver.createQueue(queueBuilder.build()); WorkflowContext wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName); @@ -258,19 +225,7 @@ public class TestRecurringJobQueue extends TaskTestBase { LOG.info("Starting job-queue: " + queueName); JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 600000, TargetState.STOP); - // Create and Enqueue jobs - List<String> currentJobNames = new ArrayList<String>(); - for (int i = 0; i <= 1; i++) { - String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; - - JobConfig.Builder jobConfig = - new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND) - .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) - .setTargetPartitionStates(Sets.newHashSet(targetPartition)); - String jobName = targetPartition.toLowerCase() + "Job" + i; - queueBuild.enqueueJob(jobName, jobConfig); - currentJobNames.add(jobName); - } + createAndEnqueueJob(queueBuild, 2); _driver.createQueue(queueBuild.build()); WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queueName); @@ -287,6 +242,69 @@ public class TestRecurringJobQueue extends TaskTestBase { } @Test + public void testDeletingRecurrentQueueWithHistory() throws Exception { + final String queueName = TestHelper.getTestMethodName(); + int intervalSeconds = 3; + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue.Builder queueBuild = TaskTestUtil.buildRecurrentJobQueue(queueName, 0, 60, + TargetState.STOP); + createAndEnqueueJob(queueBuild, 2); + + _driver.createQueue(queueBuild.build()); + WorkflowConfig workflowConfig = _driver.getWorkflowConfig(queueName); + Assert.assertEquals(workflowConfig.getTargetState(), TargetState.STOP); + + // reset interval to a smaller number so as to accelerate test + workflowConfig.putSimpleConfig(WorkflowConfig.WorkflowConfigProperty.RecurrenceInterval.name(), + "" + intervalSeconds); + _driver.updateWorkflow(queueName, workflowConfig); + + _driver.resume(queueName); + + WorkflowContext wCtx; + // wait until at least 2 workflows are scheduled based on template queue + do { + Thread.sleep(intervalSeconds); + wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName); + } while (wCtx.getScheduledWorkflows().size() < 2); + + // Stop recurring workflow + _driver.stop(queueName); + + // Record all scheduled workflows + wCtx = TaskTestUtil.pollForWorkflowContext(_driver, queueName); + final List<String> scheduledWorkflows = new ArrayList<>(wCtx.getScheduledWorkflows()); + + // Delete recurrent workflow + _driver.delete(queueName); + + // Wait until everything are cleaned up + boolean result = TestHelper.verify(new TestHelper.Verifier() { + @Override public boolean verify() throws Exception { + WorkflowContext currentQueueCtx = _driver.getWorkflowContext(queueName); + if (currentQueueCtx == null) { + // Queue is removed. Check the recorded scheduledWorkflows. + for (String workflow : scheduledWorkflows) { + if (_driver.getWorkflowContext(workflow) != null) { + return false; + } + } + return true; + } else { + // Queue is not removed yet, there might be update on the queue. + // Update the workflow list. + scheduledWorkflows.clear(); + scheduledWorkflows.addAll(currentQueueCtx.getScheduledWorkflows()); + } + return false; + } + }, 5 * 1000); + Assert.assertTrue(result); + } + + @Test public void testGetNoExistWorkflowConfig() { String randomName = "randomJob"; WorkflowConfig workflowConfig = _driver.getWorkflowConfig(randomName); @@ -297,7 +315,6 @@ public class TestRecurringJobQueue extends TaskTestBase { Assert.assertNull(workflowContext); JobContext jobContext = _driver.getJobContext(randomName); Assert.assertNull(jobContext); - } private void verifyJobDeleted(String queueName, String jobName) throws Exception { @@ -308,5 +325,22 @@ public class TestRecurringJobQueue extends TaskTestBase { Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName))); TaskTestUtil.pollForEmptyJobState(_driver, queueName, jobName); } + + private List<String> createAndEnqueueJob(JobQueue.Builder queueBuild, int jobCount) { + List<String> currentJobNames = new ArrayList<String>(); + for (int i = 0; i < jobCount; i++) { + String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; + + JobConfig.Builder jobConfig = + new JobConfig.Builder().setCommand(MockTask.TASK_COMMAND) + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet(targetPartition)); + String jobName = targetPartition.toLowerCase() + "Job" + i; + queueBuild.enqueueJob(jobName, jobConfig); + currentJobNames.add(jobName); + } + Assert.assertEquals(currentJobNames.size(), jobCount); + return currentJobNames; + } }
