Repository: helix Updated Branches: refs/heads/master 0beeb8fa2 -> 0c251bbf6
[HELIX-772] add TaskDriver.addUserContent() api and related tests Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0c251bbf Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0c251bbf Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0c251bbf Branch: refs/heads/master Commit: 0c251bbf640206729755301c3dda734eea78343f Parents: 0beeb8f Author: Harry Zhang <hrzh...@linkedin.com> Authored: Tue Oct 30 16:25:12 2018 -0700 Committer: Harry Zhang <hrzh...@linkedin.com> Committed: Wed Oct 31 13:47:52 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskDriver.java | 46 ++++- .../java/org/apache/helix/task/TaskUtil.java | 52 +++-- .../task/TestIndependentTaskRebalancer.java | 43 ++-- .../helix/task/TestGetSetUserContentStore.java | 206 +++++++++++++++++++ .../helix/task/TestGetUserContentStore.java | 144 ------------- 5 files changed, 309 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/0c251bbf/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index ea529e8..e6256ed 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -19,8 +19,26 @@ package org.apache.helix.task; * under the License. */ +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.I0Itec.zkclient.DataUpdater; -import org.apache.helix.*; +import org.apache.helix.AccessOption; +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.SystemPropertyKeys; +import org.apache.helix.ZNRecord; import org.apache.helix.controller.rebalancer.util.RebalanceScheduler; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; @@ -35,8 +53,6 @@ import org.apache.helix.util.HelixUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; - /** * CLI for scheduling/canceling workflows */ @@ -1003,6 +1019,30 @@ public class TaskDriver { } /** + * Set user content defined by the given key and string + * @param key content key + * @param value content value + * @param workflowName name of the workflow - must provide when scope is WORKFLOW + * @param jobName name of the job - must provide when scope is JOB or TASK + * @param taskName name of the task - must provide when scope is TASK + * @param scope scope of the content + */ + public void addUserContent(String key, String value, String workflowName, String jobName, String taskName, + UserContentStore.Scope scope) { + switch (scope) { + case WORKFLOW: + TaskUtil.addWorkflowJobUserContent(_propertyStore, workflowName, key, value); + break; + case JOB: + TaskUtil.addWorkflowJobUserContent(_propertyStore, jobName, key, value); + break; + default: + TaskUtil.addTaskUserContent(_propertyStore, jobName, taskName, key, value); + break; + } + } + + /** * Throw Exception if children nodes will exceed limitation after adding newNodesCount children. * @param newConfigNodeCount */ http://git-wip-us.apache.org/repos/asf/helix/blob/0c251bbf/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index 1ce448c..3461233 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -19,7 +19,6 @@ package org.apache.helix.task; * under the License. */ -import com.google.common.collect.Sets; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -41,12 +40,13 @@ import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.store.HelixPropertyStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; +import com.google.common.collect.Sets; /** * Static utility methods. @@ -334,10 +334,19 @@ public class TaskUtil { */ protected static void addWorkflowJobUserContent(final HelixManager manager, String workflowJobResource, final String key, final String value) { - String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, - USER_CONTENT_NODE); + addWorkflowJobUserContent(manager.getHelixPropertyStore(), workflowJobResource, key, value); + } + + /* package */ + static void addWorkflowJobUserContent(final HelixPropertyStore<ZNRecord> propertyStore, + String workflowJobResource, final String key, final String value) { + if (workflowJobResource == null) { + throw new IllegalArgumentException("workflowJobResource must be not null when adding workflow / job user content"); + } + String path = Joiner.on("/") + .join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, USER_CONTENT_NODE); - manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() { + propertyStore.update(path, new DataUpdater<ZNRecord>() { @Override public ZNRecord update(ZNRecord znRecord) { znRecord.setSimpleField(key, value); @@ -372,10 +381,19 @@ public class TaskUtil { */ protected static void addTaskUserContent(final HelixManager manager, String job, final String task, final String key, final String value) { + addTaskUserContent(manager.getHelixPropertyStore(), job, task, key, value); + } + + /* package */ + static void addTaskUserContent(final HelixPropertyStore<ZNRecord> propertyStore, + final String job, final String task, final String key, final String value) { + if (job == null || task == null) { + throw new IllegalArgumentException("job and task must be not null when adding task user content"); + } String path = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, job, USER_CONTENT_NODE); - manager.getHelixPropertyStore().update(path, new DataUpdater<ZNRecord>() { + propertyStore.update(path, new DataUpdater<ZNRecord>() { @Override public ZNRecord update(ZNRecord znRecord) { if (znRecord.getMapField(task) == null) { @@ -400,14 +418,14 @@ public class TaskUtil { protected static String getUserContent(HelixPropertyStore propertyStore, String key, UserContentStore.Scope scope, String workflowName, String jobName, String taskName) { switch (scope) { - case WORKFLOW: - return TaskUtil.getWorkflowJobUserContent(propertyStore, workflowName, key); - case JOB: - return TaskUtil.getWorkflowJobUserContent(propertyStore, jobName, key); - case TASK: - return TaskUtil.getTaskUserContent(propertyStore, jobName, taskName, key); - default: - throw new HelixException("Invalid scope : " + scope.name()); + case WORKFLOW: + return TaskUtil.getWorkflowJobUserContent(propertyStore, workflowName, key); + case JOB: + return TaskUtil.getWorkflowJobUserContent(propertyStore, jobName, key); + case TASK: + return TaskUtil.getTaskUserContent(propertyStore, jobName, taskName, key); + default: + throw new HelixException("Invalid scope : " + scope.name()); } } @@ -838,9 +856,7 @@ public class TaskUtil { /** * Check whether tasks are just started or still running - * * @param jobContext The job context - * * @return False if still tasks not in final state. Otherwise return true */ public static boolean checkJobStopped(JobContext jobContext) { @@ -853,10 +869,8 @@ public class TaskUtil { return true; } - /** * Count the number of jobs in a workflow that are not in final state. - * * @param workflowCfg * @param workflowCtx * @return http://git-wip-us.apache.org/repos/asf/helix/blob/0c251bbf/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java index 5509858..1b068a0 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.TestHelper; @@ -56,6 +57,7 @@ import org.testng.collections.Sets; public class TestIndependentTaskRebalancer extends TaskTestBase { private Set<String> _invokedClasses = Sets.newHashSet(); private Map<String, Integer> _runCounts = Maps.newHashMap(); + private static final AtomicBoolean _failureCtl = new AtomicBoolean(true); @BeforeClass public void beforeClass() throws Exception { @@ -85,6 +87,25 @@ public class TestIndependentTaskRebalancer extends TaskTestBase { return new TaskTwo(context, instanceName); } }); + taskFactoryReg.put("ControllableFailTask", new TaskFactory() { + @Override public Task createNewTask(TaskCallbackContext context) { + return new Task() { + @Override + public TaskResult run() { + if (_failureCtl.get()) { + return new TaskResult(Status.FAILED, null); + } else { + return new TaskResult(Status.COMPLETED, null); + } + } + + @Override + public void cancel() { + + } + }; + } + }); taskFactoryReg.put("SingleFailTask", new TaskFactory() { @Override public Task createNewTask(TaskCallbackContext context) { @@ -179,15 +200,7 @@ public class TestIndependentTaskRebalancer extends TaskTestBase { Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName); List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2); - // This is error prone as ThreadCountBasedTaskAssigner will always be re-assign - // task to same instance given we only have 1 task to assign and the order or - // iterating all nodes during assignment is always the same. Rarely some change - // will alter the order of iteration debug assignment so we need to change - // this instance name to keep on testing this functionality. - final String failInstance = "localhost_12919"; - Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap.of("fail", "" + true, - "failInstance", failInstance)); - TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap); + TaskConfig taskConfig1 = new TaskConfig("ControllableFailTask", new HashMap<String, String>()); taskConfigs.add(taskConfig1); Map<String, String> jobCommandMap = Maps.newHashMap(); jobCommandMap.put("Timeout", "1000"); @@ -212,17 +225,15 @@ public class TestIndependentTaskRebalancer extends TaskTestBase { } if (trial == 1000) { + // Fail if no re-attempts Assert.fail("Job " + jobName + " is not retried"); } - // disable failed instance - _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, failInstance, false); - _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); + // Signal the next retry to be successful + _failureCtl.set(false); - // Ensure that the class was invoked - Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName())); - Assert.assertNotSame(_driver.getJobContext(jobName).getAssignedParticipant(0), failInstance); - _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, failInstance, true); + // Verify that retry will go on and the workflow will finally complete + _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); } @Test http://git-wip-us.apache.org/repos/asf/helix/blob/0c251bbf/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java b/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java new file mode 100644 index 0000000..d4ba29a --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/TestGetSetUserContentStore.java @@ -0,0 +1,206 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.integration.task.MockTask; +import org.apache.helix.integration.task.TaskTestBase; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.tools.ClusterSetup; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestGetSetUserContentStore extends TaskTestBase { + private static final String JOB_COMMAND = "DummyCommand"; + private static final int NUM_JOB = 5; + private Map<String, String> _jobCommandMap; + + private final CountDownLatch allTasksReady = new CountDownLatch(NUM_JOB); + private final CountDownLatch adminReady = new CountDownLatch(1); + + private enum TaskDumpResultKey { + WorkflowContent, + JobContent, + TaskContent + } + + private class TaskRecord { + String workflowName; + String jobName; + String taskName; + + public TaskRecord(String workflow, String job, String task) { + workflowName = workflow; + jobName = job; + taskName = task; + } + } + + @BeforeClass + public void beforeClass() throws Exception { + _participants = new MockParticipantManager[_numNodes]; + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursively(namespace); + } + + // Setup cluster and instances + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + setupTool.addCluster(CLUSTER_NAME, true); + for (int i = 0; i < _numNodes; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i); + setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + + // start dummy participants + for (int i = 0; i < _numNodes; i++) { + final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i); + + // Set task callbacks + Map<String, TaskFactory> taskFactoryReg = new HashMap<>(); + TaskFactory shortTaskFactory = new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new WriteTask(context); + } + }; + taskFactoryReg.put("WriteTask", shortTaskFactory); + + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", + new TaskStateModelFactory(_participants[i], taskFactoryReg)); + _participants[i].syncStart(); + } + + // Start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // Start an admin connection + _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", + InstanceType.ADMINISTRATOR, ZK_ADDR); + _manager.connect(); + _driver = new TaskDriver(_manager); + + _jobCommandMap = new HashMap<>(); + } + + @Test + public void testGetUserContentStore() throws InterruptedException { + String workflowName = TestHelper.getTestMethodName(); + Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName); + WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowName); + configBuilder.setAllowOverlapJobAssignment(true); + workflowBuilder.setWorkflowConfig(configBuilder.build()); + + Map<String, TaskRecord> recordMap = new HashMap<>(); + // Create 5 jobs with 1 WriteTask each + for (int i = 0; i < NUM_JOB; i++) { + List<TaskConfig> taskConfigs = new ArrayList<>(); + taskConfigs.add(new TaskConfig("WriteTask", new HashMap<String, String>())); + JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND) + .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap); + String jobSuffix = "JOB" + i; + String jobName = workflowName + "_" + jobSuffix; + String taskName = jobName + "_0"; + workflowBuilder.addJob("JOB" + i, jobConfigBulider); + recordMap.put(jobName, new TaskRecord(workflowName, jobName, taskName)); + } + + // Start the workflow and wait for all tasks started + _driver.start(workflowBuilder.build()); + allTasksReady.await(); + + // add "workflow":"workflow" to the workflow's user content + _driver.addUserContent(workflowName, workflowName, workflowName, null, null, UserContentStore.Scope.WORKFLOW); + for (TaskRecord rec : recordMap.values()) { + // add "job":"job" to the job's user content + _driver.addUserContent(rec.jobName, rec.jobName, null, rec.jobName, null, UserContentStore.Scope.JOB); + // String taskId = _driver.getJobContext(rec.jobName).getTaskIdForPartition(0); + + + // add "taskId":"taskId" to the task's user content + _driver.addUserContent(rec.taskName, rec.taskName, null, rec.jobName, rec.taskName, UserContentStore.Scope.TASK); + } + adminReady.countDown(); + _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); + + // Aggregate key-value mappings in UserContentStore + for (TaskRecord rec : recordMap.values()) { + Assert.assertEquals(_driver + .getUserContent(TaskDumpResultKey.WorkflowContent.name(), UserContentStore.Scope.WORKFLOW, + rec.workflowName, rec.jobName, rec.taskName), + constructContentStoreResultString(rec.workflowName, rec.workflowName)); + Assert.assertEquals(_driver + .getUserContent(TaskDumpResultKey.JobContent.name(), UserContentStore.Scope.JOB, + rec.workflowName, rec.jobName, rec.taskName), + constructContentStoreResultString(rec.jobName, rec.jobName)); + Assert.assertEquals(_driver + .getUserContent(TaskDumpResultKey.TaskContent.name(), UserContentStore.Scope.TASK, + rec.workflowName, rec.jobName, rec.taskName), + constructContentStoreResultString(rec.taskName, rec.taskName)); + } + } + + /** + * A mock task that writes to UserContentStore. MockTask extends UserContentStore. + */ + private class WriteTask extends MockTask { + + public WriteTask(TaskCallbackContext context) { + super(context); + } + + @Override + public TaskResult run() { + allTasksReady.countDown(); + try { + adminReady.await(); + } catch (Exception e) { + return new TaskResult(TaskResult.Status.FATAL_FAILED, e.getMessage()); + } + String workflowStoreContent = constructContentStoreResultString(_workflowName, getUserContent(_workflowName, Scope.WORKFLOW)); + String jobStoreContent = constructContentStoreResultString(_jobName, getUserContent(_jobName, Scope.JOB)); + String taskStoreContent = constructContentStoreResultString(_taskName, getUserContent(_taskName, Scope.TASK)); + putUserContent(TaskDumpResultKey.WorkflowContent.name(), workflowStoreContent, Scope.WORKFLOW); + putUserContent(TaskDumpResultKey.JobContent.name(), jobStoreContent, Scope.JOB); + putUserContent(TaskDumpResultKey.TaskContent.name(), taskStoreContent, Scope.TASK); + return new TaskResult(TaskResult.Status.COMPLETED, ""); + } + } + + private static String constructContentStoreResultString(String key, String value) { + return String.format("%s::%s", key, value); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/0c251bbf/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java b/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java deleted file mode 100644 index 392c278..0000000 --- a/helix-core/src/test/java/org/apache/helix/task/TestGetUserContentStore.java +++ /dev/null @@ -1,144 +0,0 @@ -package org.apache.helix.task; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.InstanceType; -import org.apache.helix.TestHelper; -import org.apache.helix.integration.manager.ClusterControllerManager; -import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.integration.task.MockTask; -import org.apache.helix.integration.task.TaskTestBase; -import org.apache.helix.participant.StateMachineEngine; -import org.apache.helix.tools.ClusterSetup; -import org.testng.Assert; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -public class TestGetUserContentStore extends TaskTestBase { - private static final String JOB_COMMAND = "DummyCommand"; - private Map<String, String> _jobCommandMap; - - @BeforeClass - public void beforeClass() throws Exception { - _participants = new MockParticipantManager[_numNodes]; - String namespace = "/" + CLUSTER_NAME; - if (_gZkClient.exists(namespace)) { - _gZkClient.deleteRecursively(namespace); - } - - // Setup cluster and instances - ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); - setupTool.addCluster(CLUSTER_NAME, true); - for (int i = 0; i < _numNodes; i++) { - String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i); - setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); - } - - // start dummy participants - for (int i = 0; i < _numNodes; i++) { - final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i); - - // Set task callbacks - Map<String, TaskFactory> taskFactoryReg = new HashMap<>(); - TaskFactory shortTaskFactory = new TaskFactory() { - @Override - public Task createNewTask(TaskCallbackContext context) { - return new WriteTask(context); - } - }; - taskFactoryReg.put("WriteTask", shortTaskFactory); - - _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); - - // Register a Task state model factory. - StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); - stateMachine.registerStateModelFactory("Task", - new TaskStateModelFactory(_participants[i], taskFactoryReg)); - _participants[i].syncStart(); - } - - // Start controller - String controllerName = CONTROLLER_PREFIX + "_0"; - _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); - _controller.syncStart(); - - // Start an admin connection - _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", - InstanceType.ADMINISTRATOR, ZK_ADDR); - _manager.connect(); - _driver = new TaskDriver(_manager); - - _jobCommandMap = new HashMap<>(); - } - - @Test - public void testGetUserContentStore() throws InterruptedException { - String workflowName = TestHelper.getTestMethodName(); - Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName); - WorkflowConfig.Builder configBuilder = new WorkflowConfig.Builder(workflowName); - configBuilder.setAllowOverlapJobAssignment(true); - workflowBuilder.setWorkflowConfig(configBuilder.build()); - - List<String> jobsThatRan = new ArrayList<>(); - // Create 5 jobs with 1 WriteTask each - for (int i = 0; i < 5; i++) { - List<TaskConfig> taskConfigs = new ArrayList<>(); - taskConfigs.add(new TaskConfig("WriteTask", new HashMap<String, String>())); - JobConfig.Builder jobConfigBulider = new JobConfig.Builder().setCommand(JOB_COMMAND) - .addTaskConfigs(taskConfigs).setJobCommandConfigMap(_jobCommandMap); - workflowBuilder.addJob("JOB" + i, jobConfigBulider); - jobsThatRan.add(workflowName + "_JOB" + i); - } - - // Start the workflow and wait until completion - _driver.start(workflowBuilder.build()); - _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED); - - // Aggregate key-value mappings in UserContentStore - int runCount = 0; - for (String jobName : jobsThatRan) { - String value = _driver.getUserContent(jobName, UserContentStore.Scope.WORKFLOW, workflowName, - jobName, null); - runCount += Integer.parseInt(value); - } - Assert.assertEquals(runCount, 5); - } - - /** - * A mock task that writes to UserContentStore. MockTask extends UserContentStore. - */ - private class WriteTask extends MockTask { - - public WriteTask(TaskCallbackContext context) { - super(context); - } - - @Override - public TaskResult run() { - putUserContent(_jobName, Integer.toString(1), Scope.WORKFLOW); - return new TaskResult(TaskResult.Status.COMPLETED, ""); - } - } -}