Repository: helix Updated Branches: refs/heads/helix-0.6.x 947a7d557 -> adfe4dda8
Support user defined content store per workflow/job/task layer 1. Add feature to support workflow/job/task layer key value user defined content store 2. Add test case for workflow/job/task layer key-value pair store and verify. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/adfe4dda Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/adfe4dda Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/adfe4dda Branch: refs/heads/helix-0.6.x Commit: adfe4dda8aaef6b1ea088acaf24bd120f6fe250d Parents: 947a7d5 Author: Junkai Xue <[email protected]> Authored: Thu Aug 18 13:33:31 2016 -0700 Committer: Junkai Xue <[email protected]> Committed: Wed Aug 31 10:54:32 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskDriver.java | 3 + .../org/apache/helix/task/TaskStateModel.java | 4 + .../java/org/apache/helix/task/TaskUtil.java | 97 ++++++++ .../org/apache/helix/task/UserContentStore.java | 110 ++++++++++ .../apache/helix/task/WorkflowRebalancer.java | 2 + .../integration/task/TestUserContentStore.java | 219 +++++++++++++++++++ 6 files changed, 435 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/adfe4dda/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index e62d15c..bc582e1 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -596,6 +596,9 @@ public class TaskDriver { _admin.addResource(_clusterName, workflow, 1, TaskConstants.STATE_MODEL_NAME); IdealState is = buildWorkflowIdealState(workflow); + TaskUtil + .createUserContent(_propertyStore, workflow, new ZNRecord(TaskUtil.USER_CONTENT_NODE)); + _admin.setResourceIdealState(_clusterName, workflow, is); } http://git-wip-us.apache.org/repos/asf/helix/blob/adfe4dda/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java index fd07176..a7c58d2 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModel.java @@ -304,6 +304,10 @@ public class TaskStateModel extends StateModel { TaskFactory taskFactory = _taskFactoryRegistry.get(command); Task task = taskFactory.createNewTask(callbackContext); + if (task instanceof UserContentStore) { + ((UserContentStore) task).init(_manager, cfg.getWorkflow(), msg.getResourceName(), taskPartition); + } + // Submit the task for execution _taskRunner = new TaskRunner(task, msg.getResourceName(), taskPartition, msg.getTgtName(), _manager, http://git-wip-us.apache.org/repos/asf/helix/blob/adfe4dda/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index 44de175..1d89656 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.I0Itec.zkclient.DataUpdater; import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; @@ -48,6 +49,7 @@ import com.google.common.collect.Maps; public class TaskUtil { private static final Logger LOG = Logger.getLogger(TaskUtil.class); public static final String CONTEXT_NODE = "Context"; + public static final String USER_CONTENT_NODE = "UserContent"; /** * Parses job resource configurations in Helix into a {@link JobConfig} object. @@ -220,6 +222,101 @@ public class TaskUtil { } /** + * Intialize the user content store znode setup + * @param propertyStore zookeeper property store + * @param workflowJobResource the name of workflow or job + * @param record the initial data + */ + protected static void createUserContent(HelixPropertyStore propertyStore, String workflowJobResource, + ZNRecord record) { + propertyStore.create(Joiner.on("/") + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, + TaskUtil.USER_CONTENT_NODE), record, AccessOption.PERSISTENT); + } + + /** + * Get user defined workflow or job level key-value pair data + * + * @param manager a connection to Helix + * @param workflowJobResource the name of workflow + * @param key the key of key-value pair + * + * @return null if there is no such pair, otherwise return a String + */ + protected static String getWorkflowJobUserContent(HelixManager manager, + String workflowJobResource, String key) { + ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/") + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE), null, + AccessOption.PERSISTENT); + return r != null ? r.getSimpleField(key) : null; + } + + /** + * Add an user defined key-value pair data to workflow or job level + * + * @param manager a connection to Helix + * @param workflowJobResource the name of workflow or job + * @param key the key of key-value pair + * @param value the value of key-value pair + */ + protected static void addWorkflowJobUserContent(final HelixManager manager, + String workflowJobResource, final String key, final String value) { + String path = Joiner.on("/") + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE); + + manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() { + @Override public ZNRecord update(ZNRecord znRecord) { + znRecord.setSimpleField(key, value); + return znRecord; + } + }, AccessOption.PERSISTENT); + } + + /** + * Get user defined task level key-value pair data + * + * @param manager a connection to Helix + * @param jobResource the name of job + * @param taskResource the name of the task + * @param key the key of key-value pair + * + * @return null if there is no such pair, otherwise return a String + */ + protected static String getTaskUserContent(HelixManager manager, String jobResource, + String taskResource, String key) { + ZNRecord r = manager.getHelixPropertyStore().get( + Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, USER_CONTENT_NODE), + null, AccessOption.PERSISTENT); + return r != null ? (r.getMapField(taskResource) != null + ? r.getMapField(taskResource).get(key) + : null) : null; + } + + /** + * Add an user defined key-value pair data to task level + * + * @param manager a connection to Helix + * @param jobResource the name of job + * @param taskResource the name of task + * @param key the key of key-value pair + * @param value the value of key-value pair + */ + protected static void addTaskUserContent(final HelixManager manager, String jobResource, + final String taskResource, final String key, final String value) { + String path = + Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, USER_CONTENT_NODE); + + manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() { + @Override public ZNRecord update(ZNRecord znRecord) { + if (znRecord.getMapField(taskResource) == null) { + znRecord.setMapField(taskResource, new HashMap<String, String>()); + } + znRecord.getMapField(taskResource).put(key, value); + return znRecord; + } + }, AccessOption.PERSISTENT); + } + /** * Get a workflow-qualified job name for a single-job workflow * * @param singleJobWorkflow the name of the single-job workflow http://git-wip-us.apache.org/repos/asf/helix/blob/adfe4dda/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java b/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java new file mode 100644 index 0000000..b188be7 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java @@ -0,0 +1,110 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixException; +import org.apache.helix.HelixManager; + +/** + * UserContentStore provides default implementation of user defined key-value pair store per task, + * job and workflow level. + * + * TODO: This class should be merged to Task interface when Helix bump up to Java 8 + */ +public abstract class UserContentStore { + + protected enum Scope { + /** + * Define the content store in workflow level + */ + WORKFLOW, + + /** + * Define the content store in job level + */ + JOB, + + /** + * Define the content store in task level + */ + TASK + } + + private HelixManager _manager; + private String _workflowName; + private String _jobName; + private String _taskName; + + /** + * Default initialization of user content store + * @param manager The Helix manager + * @param workflowName The name of workflow that the task belongs to + * @param jobName The name of job that the task belongs to + * @param taskName The name of current task + */ + public void init(HelixManager manager, String workflowName, String jobName, String taskName) { + _manager = manager; + _workflowName = workflowName; + _jobName = jobName; + _taskName = taskName; + } + + /** + * Default implementation for user defined put key-value pair + * @param key The key of key-value pair + * @param value The value of key-value pair + * @param scope The scope defines which layer to store + */ + public void putUserContent(String key, String value, Scope scope) { + switch (scope) { + case WORKFLOW: + TaskUtil.addWorkflowJobUserContent(_manager, _workflowName, key, value); + break; + case JOB: + TaskUtil.addWorkflowJobUserContent(_manager, _jobName, key, value); + break; + case TASK: + TaskUtil.addTaskUserContent(_manager, _jobName, _taskName, key, value); + break; + default: + throw new HelixException("Invalid scope : " + scope.name()); + } + } + + /** + * Default implementation for user defined get key-value pair + * @param key The key of key-value pair + * @param scope The scope defines which layer that key-value pair stored + * @return Null if key-value pair not found or this content store does not exists. Otherwise, + * return a String + */ + public String getUserContent(String key, Scope scope) { + switch (scope) { + case WORKFLOW: + return TaskUtil.getWorkflowJobUserContent(_manager, _workflowName, key); + case JOB: + return TaskUtil.getWorkflowJobUserContent(_manager, _jobName, key); + case TASK: + return TaskUtil.getTaskUserContent(_manager, _jobName, _taskName, key); + default: + throw new HelixException("Invalid scope : " + scope.name()); + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/adfe4dda/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java index b4f25d5..b78ee7f 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java @@ -164,6 +164,8 @@ public class WorkflowRebalancer extends TaskRebalancer { HelixAdmin admin = _manager.getClusterManagmentTool(); IdealState jobIS = admin.getResourceIdealState(_manager.getClusterName(), jobResource); + TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource, + new ZNRecord(TaskUtil.USER_CONTENT_NODE)); if (jobIS != null) { LOG.info("Job " + jobResource + " idealstate already exists!"); return; http://git-wip-us.apache.org/repos/asf/helix/blob/adfe4dda/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java new file mode 100644 index 0000000..b2b27ef --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUserContentStore.java @@ -0,0 +1,219 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskResult; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.UserContentStore; +import org.apache.helix.task.Workflow; +import org.apache.helix.tools.ClusterSetup; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class TestUserContentStore extends TaskTestBase { + + @BeforeClass + public void beforeClass() throws Exception { + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + + // Setup cluster and instances + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + setupTool.addCluster(CLUSTER_NAME, true); + for (int i = 0; i < _numNodes; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i); + setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + + // start dummy participants + for (int i = 0; i < _numNodes; i++) { + final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i); + + // Set task callbacks + Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>(); + + taskFactoryReg.put("ContentStoreTask", new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new ContentStoreTask(); + } + }); + + taskFactoryReg.put("TaskOne", new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new TaskOne(); + } + }); + + taskFactoryReg.put("TaskTwo", new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new TaskTwo(); + } + }); + + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i], + taskFactoryReg)); + _participants[i].syncStart(); + } + + // Start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // Start an admin connection + _manager = + HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, + ZK_ADDR); + _manager.connect(); + _driver = new TaskDriver(_manager); + } + + @Test + public void testWorkflowAndJobTaskUserContentStore() throws InterruptedException { + String jobName = TestHelper.getTestMethodName(); + Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); + List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1); + Map<String, String> taskConfigMap = Maps.newHashMap(); + TaskConfig taskConfig1 = new TaskConfig("ContentStoreTask", taskConfigMap, false); + taskConfigs.add(taskConfig1); + Map<String, String> jobCommandMap = Maps.newHashMap(); + jobCommandMap.put("Timeout", "1000"); + + JobConfig.Builder jobBuilder = new JobConfig.Builder().setCommand("DummyCommand") + .addTaskConfigs(taskConfigs) + .setJobCommandConfigMap(jobCommandMap); + workflowBuilder.addJob(jobName, jobBuilder); + + _driver.start(workflowBuilder.build()); + _driver.pollForWorkflowState(jobName, TaskState.COMPLETED); + Assert + .assertEquals(_driver.getWorkflowContext(jobName).getWorkflowState(), TaskState.COMPLETED); + } + + @Test + public void testJobContentPutAndGetWithDependency() throws InterruptedException { + String queueName = TestHelper.getTestMethodName(); + JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName, 0, 100); + + List<TaskConfig> taskConfigs1 = Lists.newArrayListWithCapacity(1); + List<TaskConfig> taskConfigs2 = Lists.newArrayListWithCapacity(1); + Map<String, String> taskConfigMap1 = Maps.newHashMap(); + Map<String, String> taskConfigMap2 = Maps.newHashMap(); + TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap1, false); + TaskConfig taskConfig2 = new TaskConfig("TaskTwo", taskConfigMap2, false); + + taskConfigs1.add(taskConfig1); + taskConfigs2.add(taskConfig2); + Map<String, String> jobCommandMap = Maps.newHashMap(); + jobCommandMap.put("Timeout", "1000"); + + JobConfig.Builder jobBuilder1 = + new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs1) + .setJobCommandConfigMap(jobCommandMap); + JobConfig.Builder jobBuilder2 = + new JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs2) + .setJobCommandConfigMap(jobCommandMap); + + queueBuilder.enqueueJob(queueName + 0, jobBuilder1); + queueBuilder.enqueueJob(queueName + 1, jobBuilder2); + + _driver.start(queueBuilder.build()); + _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, queueName + 1), + TaskState.COMPLETED); + Assert.assertEquals(_driver.getWorkflowContext(queueName) + .getJobState(TaskUtil.getNamespacedJobName(queueName, queueName + 1)), TaskState.COMPLETED); + } + + private static class ContentStoreTask extends UserContentStore implements Task { + + @Override public TaskResult run() { + putUserContent("ContentTest", "Value1", Scope.JOB); + putUserContent("ContentTest", "Value2", Scope.WORKFLOW); + putUserContent("ContentTest", "Value3", Scope.TASK); + if (!getUserContent("ContentTest", Scope.JOB).equals("Value1") || !getUserContent( + "ContentTest", Scope.WORKFLOW).equals("Value2") || !getUserContent("ContentTest", + Scope.TASK).equals("Value3")) { + return new TaskResult(TaskResult.Status.FAILED, null); + } + return new TaskResult(TaskResult.Status.COMPLETED, null); + } + + @Override public void cancel() { + } + } + + + private static class TaskOne extends UserContentStore implements Task { + + @Override public TaskResult run() { + putUserContent("RaceTest", "RaceValue", Scope.WORKFLOW); + return new TaskResult(TaskResult.Status.COMPLETED, null); + } + + @Override public void cancel() { + } + } + + private static class TaskTwo extends UserContentStore implements Task { + + @Override public TaskResult run() { + if (!getUserContent("RaceTest", Scope.WORKFLOW).equals("RaceValue")) { + return new TaskResult(TaskResult.Status.FAILED, null); + } + return new TaskResult(TaskResult.Status.COMPLETED, null); + + } + + @Override public void cancel() { + } + } +}
