Repository: helix Updated Branches: refs/heads/master 343f8dd33 -> 3ec93129e
[HELIX-732] Expose UserContentStore in TaskDriver There was a user request for this feature. The intended use is to allow for aggregation work reading from temporary data written by tasks, by allowing a get() of UserContentStore at the TaskDriver level. UserContentStore is a potentially useful feature that is currently under-utilized - this will enable Gobblin and other users of Task Framework to better utilize UserContentStore. Changelist: 1. Add getUserContentStore() in TaskDriver 2. Add TestUserContentStore, an integration test for this feature 3. Add descriptive JavaDoc warning the user that get() and put() methods for UserContentStore is not thread-safe Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3ec93129 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3ec93129 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3ec93129 Branch: refs/heads/master Commit: 3ec93129e717185cd1db0fc40d09e7603d8aee5d Parents: 343f8dd Author: Hunter Lee <[email protected]> Authored: Mon Jul 16 12:18:03 2018 -0700 Committer: Hunter Lee <[email protected]> Committed: Mon Jul 16 14:56:34 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskDriver.java | 17 ++- .../java/org/apache/helix/task/TaskUtil.java | 48 +++++-- .../org/apache/helix/task/UserContentStore.java | 53 ++++--- .../helix/task/TestGetUserContentStore.java | 144 +++++++++++++++++++ 4 files changed, 220 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/3ec93129/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index bc767b7..4fe732b 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -988,6 +988,21 @@ public class TaskDriver { } /** + * Returns the lookup of UserContentStore by key. + * @param key key used at write time by a task implementing UserContentStore + * @param scope scope used at write time + * @param workflowName name of workflow. Must be supplied + * @param jobName name of job. Optional if scope is WORKFLOW + * @param taskName name of task. Optional if scope is WORKFLOW or JOB + * @return null if key-value pair not found or this content store does not exist. Otherwise, + * return a String + */ + public String getUserContent(String key, UserContentStore.Scope scope, String workflowName, + String jobName, String taskName) { + return TaskUtil.getUserContent(_propertyStore, key, scope, workflowName, jobName, taskName); + } + + /** * Throw Exception if children nodes will exceed limitation after adding newNodesCount children. * @param newConfigNodeCount */ @@ -999,4 +1014,4 @@ public class TaskDriver { "Cannot create more workflows or jobs because there are already too many items created in the path CONFIGS."); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/3ec93129/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index 68a9d4a..9ca0062 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -309,17 +309,17 @@ public class TaskUtil { } /** - * Get user defined workflow or job level key-value pair data - * @param manager a connection to Helix - * @param workflowJobResource the name of workflow - * @param key the key of key-value pair + * Get user-defined workflow/job scope key-value pair data. This method takes + * HelixPropertyStore<ZNRecord>. + * @param propertyStore + * @param workflowJobResource + * @param key * @return null if there is no such pair, otherwise return a String */ - protected static String getWorkflowJobUserContent(HelixManager manager, + protected static String getWorkflowJobUserContent(HelixPropertyStore<ZNRecord> propertyStore, String workflowJobResource, String key) { - ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/") - .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE), null, - AccessOption.PERSISTENT); + ZNRecord r = propertyStore.get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, + workflowJobResource, USER_CONTENT_NODE), null, AccessOption.PERSISTENT); return r != null ? r.getSimpleField(key) : null; } @@ -346,15 +346,15 @@ public class TaskUtil { /** * Get user defined task level key-value pair data - * @param manager a connection to Helix + * @param propertyStore * @param job the name of job * @param task the name of the task * @param key the key of key-value pair * @return null if there is no such pair, otherwise return a String */ - protected static String getTaskUserContent(HelixManager manager, String job, String task, - String key) { - ZNRecord r = manager.getHelixPropertyStore().get( + protected static String getTaskUserContent(HelixPropertyStore<ZNRecord> propertyStore, String job, + String task, String key) { + ZNRecord r = propertyStore.get( Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE), null, AccessOption.PERSISTENT); return r != null ? (r.getMapField(task) != null ? r.getMapField(task).get(key) : null) : null; @@ -386,6 +386,30 @@ public class TaskUtil { } /** + * Helper method for looking up UserContentStore content. + * @param propertyStore + * @param key + * @param scope + * @param workflowName + * @param jobName + * @param taskName + * @return value corresponding to the key + */ + protected static String getUserContent(HelixPropertyStore propertyStore, String key, + UserContentStore.Scope scope, String workflowName, String jobName, String taskName) { + switch (scope) { + case WORKFLOW: + return TaskUtil.getWorkflowJobUserContent(propertyStore, workflowName, key); + case JOB: + return TaskUtil.getWorkflowJobUserContent(propertyStore, jobName, key); + case TASK: + return TaskUtil.getTaskUserContent(propertyStore, jobName, taskName, key); + default: + throw new HelixException("Invalid scope : " + scope.name()); + } + } + + /** * Get a workflow-qualified job name for a single-job workflow * @param singleJobWorkflow the name of the single-job workflow * @return The namespaced job name, which is just singleJobWorkflow_singleJobWorkflow http://git-wip-us.apache.org/repos/asf/helix/blob/3ec93129/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java b/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java index b188be7..ba80e88 100644 --- a/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java +++ b/helix-core/src/main/java/org/apache/helix/task/UserContentStore.java @@ -25,7 +25,6 @@ import org.apache.helix.HelixManager; /** * UserContentStore provides default implementation of user defined key-value pair store per task, * job and workflow level. - * * TODO: This class should be merged to Task interface when Helix bump up to Java 8 */ public abstract class UserContentStore { @@ -47,10 +46,10 @@ public abstract class UserContentStore { TASK } - private HelixManager _manager; - private String _workflowName; - private String _jobName; - private String _taskName; + protected HelixManager _manager; + protected String _workflowName; + protected String _jobName; + protected String _taskName; /** * Default initialization of user content store @@ -67,44 +66,40 @@ public abstract class UserContentStore { } /** - * Default implementation for user defined put key-value pair + * Default implementation for user defined put key-value pair. Warning: this method is not + * thread-safe - we recommend creating a different key-value pair instead of modifying the value + * on the same key. * @param key The key of key-value pair * @param value The value of key-value pair * @param scope The scope defines which layer to store */ public void putUserContent(String key, String value, Scope scope) { switch (scope) { - case WORKFLOW: - TaskUtil.addWorkflowJobUserContent(_manager, _workflowName, key, value); - break; - case JOB: - TaskUtil.addWorkflowJobUserContent(_manager, _jobName, key, value); - break; - case TASK: - TaskUtil.addTaskUserContent(_manager, _jobName, _taskName, key, value); - break; - default: - throw new HelixException("Invalid scope : " + scope.name()); + case WORKFLOW: + TaskUtil.addWorkflowJobUserContent(_manager, _workflowName, key, value); + break; + case JOB: + TaskUtil.addWorkflowJobUserContent(_manager, _jobName, key, value); + break; + case TASK: + TaskUtil.addTaskUserContent(_manager, _jobName, _taskName, key, value); + break; + default: + throw new HelixException("Invalid scope : " + scope.name()); } } /** - * Default implementation for user defined get key-value pair + * Default implementation for user defined get key-value pair. Warning: this method is not + * thread-safe - we recommend creating a different key-value pair instead of modifying the value + * on the same key. * @param key The key of key-value pair * @param scope The scope defines which layer that key-value pair stored * @return Null if key-value pair not found or this content store does not exists. Otherwise, * return a String */ public String getUserContent(String key, Scope scope) { - switch (scope) { - case WORKFLOW: - return TaskUtil.getWorkflowJobUserContent(_manager, _workflowName, key); - case JOB: - return TaskUtil.getWorkflowJobUserContent(_manager, _jobName, key); - case TASK: - return TaskUtil.getTaskUserContent(_manager, _jobName, _taskName, key); - default: - throw new HelixException("Invalid scope : " + scope.name()); - } + return TaskUtil.getUserContent(_manager.getHelixPropertyStore(), key, scope, _workflowName, + _jobName, _taskName); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/3ec93129/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java b/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java new file mode 100644 index 0000000..392c278 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java @@ -0,0 +1,144 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.integration.task.MockTask; +import org.apache.helix.integration.task.TaskTestBase; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.tools.ClusterSetup; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestGetUserContentStore extends TaskTestBase { + private static final String JOB_COMMAND = "DummyCommand"; + private Map<String, String> _jobCommandMap; + + @BeforeClass + public void beforeClass() throws Exception { + _participants = new MockParticipantManager[_numNodes]; + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursively(namespace); + } + + // Setup cluster and instances + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + setupTool.addCluster(CLUSTER_NAME, true); + for (int i = 0; i < _numNodes; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i); + setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + + // start dummy participants + for (int i = 0; i < _numNodes; i++) { + final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i); + + // Set task callbacks + Map<String, TaskFactory> taskFactoryReg = new HashMap<>(); + TaskFactory shortTaskFactory = new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new WriteTask(context); + } + }; + taskFactoryReg.put("WriteTask", shortTaskFactory); + + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", + new TaskStateModelFactory(_participants[i], taskFactoryReg)); + _participants[i].syncStart(); + } + + // Start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // Start an admin connection + _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", + InstanceType.ADMINISTRATOR, ZK_ADDR); + _manager.connect(); + _driver = new TaskDriver(_manager); + + _jobCommandMap = new HashMap<>(); + } + + @Test + public void testGetUserContentStore() throws InterruptedException { + String workflowName = TestHelper.getTestMethodName(); + Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName); + WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowName); + configBuilder.setAllowOverlapJobAssignment(true); + workflowBuilder.setWorkflowConfig(configBuilder.build()); + + List<String> jobsThatRan = new ArrayList<>(); + // Create 5 jobs with 1 WriteTask each + for (int i = 0; i < 5; i++) { + List<TaskConfig> taskConfigs = new ArrayList<>(); + taskConfigs.add(new TaskConfig("WriteTask", new HashMap<String, String>())); + JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND) + .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap); + workflowBuilder.addJob("JOB" + i, jobConfigBulider); + jobsThatRan.add(workflowName + "_JOB" + i); + } + + // Start the workflow and wait until completion + _driver.start(workflowBuilder.build()); + _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); + + // Aggregate key-value mappings in UserContentStore + int runCount = 0; + for (String jobName : jobsThatRan) { + String value = _driver.getUserContent(jobName, UserContentStore.Scope.WORKFLOW, workflowName, + jobName, null); + runCount += Integer.parseInt(value); + } + Assert.assertEquals(runCount, 5); + } + + /** + * A mock task that writes to UserContentStore. MockTask extends UserContentStore. + */ + private class WriteTask extends MockTask { + + public WriteTask(TaskCallbackContext context) { + super(context); + } + + @Override + public TaskResult run() { + putUserContent(_jobName, Integer.toString(1), Scope.WORKFLOW); + return new TaskResult(TaskResult.Status.COMPLETED, ""); + } + } +}
