Repository: helix Updated Branches: refs/heads/helix-0.6.x ba98d03d5 -> 1e017f16a
http://git-wip-us.apache.org/repos/asf/helix/blob/1e017f16/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java new file mode 100644 index 0000000..4839a9a --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java @@ -0,0 +1,321 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.Map; + +import org.apache.helix.AccessOption; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.PropertyKey; +import org.apache.helix.integration.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskConstants; +import org.apache.helix.task.TaskContext; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskPartitionState; +import org.apache.helix.task.TaskResult; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.Workflow; +import org.apache.helix.task.WorkflowContext; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; + +public class TestTaskRebalancer extends ZkIntegrationTestBase { + private static final int n = 5; + private static final int START_PORT = 12918; + private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave"; + private static final int NUM_PARTITIONS = 20; + private static final int NUM_REPLICAS = 3; + private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName(); + private final MockParticipantManager[] _participants = new MockParticipantManager[n]; + private ClusterControllerManager _controller; + + private HelixManager _manager; + private TaskDriver _driver; + + @BeforeClass + public void beforeClass() throws Exception { + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + setupTool.addCluster(CLUSTER_NAME, true); + for (int i = 0; i < n; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + + // Set up target db + setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS, + MASTER_SLAVE_STATE_MODEL); + setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS); + + Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>(); + taskFactoryReg.put("Reindex", new TaskFactory() { + @Override + public Task createNewTask(String config) { + return new ReindexTask(config); + } + }); + + // start dummy participants + for (int i = 0; i < n; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i], + taskFactoryReg)); + _participants[i].syncStart(); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // create cluster manager + _manager = + HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, + ZK_ADDR); + _manager.connect(); + _driver = new TaskDriver(_manager); + + boolean result = + ClusterStateVerifier + .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + CLUSTER_NAME)); + Assert.assertTrue(result); + } + + @AfterClass + public void afterClass() throws Exception { + _controller.syncStop(); + // _controller = null; + for (int i = 0; i < n; i++) { + _participants[i].syncStop(); + // _participants[i] = null; + } + + _manager.disconnect(); + } + + @Test + public void basic() throws Exception { + basic(100); + } + + @Test + public void zeroTaskCompletionTime() throws Exception { + basic(0); + } + + @Test + public void testExpiry() throws Exception { + String taskName = "Expiry"; + long expiry = 1000; + Workflow flow = + WorkflowGenerator + .generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskName, + TaskConfig.COMMAND_CONFIG, String.valueOf(100)).setExpiry(expiry).build(); + + _driver.start(flow); + TestUtil.pollForWorkflowState(_manager, taskName, TaskState.IN_PROGRESS); + + // Running workflow should have config and context viewable through accessor + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + PropertyKey workflowCfgKey = accessor.keyBuilder().resourceConfig(taskName); + String workflowPropStoreKey = + Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskName); + + // Ensure context and config exist + Assert.assertTrue(_manager.getHelixPropertyStore().exists(workflowPropStoreKey, + AccessOption.PERSISTENT)); + Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null); + + // Wait for task to finish and expire + TestUtil.pollForWorkflowState(_manager, taskName, TaskState.COMPLETED); + Thread.sleep(expiry); + _driver.invokeRebalance(); + Thread.sleep(expiry); + + // Ensure workflow config and context were cleaned up by now + Assert.assertFalse(_manager.getHelixPropertyStore().exists(workflowPropStoreKey, + AccessOption.PERSISTENT)); + Assert.assertEquals(accessor.getProperty(workflowCfgKey), null); + } + + private void basic(long taskCompletionTime) throws Exception { + // We use a different resource name in each test method as a work around for a helix participant + // bug where it does + // not clear locally cached state when a resource partition is dropped. Once that is fixed we + // should change these + // tests to use the same resource name and implement a beforeMethod that deletes the task + // resource. + final String taskResource = "basic" + taskCompletionTime; + Workflow flow = + WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource, + TaskConfig.COMMAND_CONFIG, String.valueOf(taskCompletionTime)).build(); + _driver.start(flow); + + // Wait for task completion + TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED); + + // Ensure all partitions are completed individually + TaskContext ctx = + TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource)); + for (int i = 0; i < NUM_PARTITIONS; i++) { + Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED); + Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1); + } + } + + @Test + public void partitionSet() throws Exception { + final String taskResource = "partitionSet"; + ImmutableList<Integer> targetPartitions = ImmutableList.of(1, 2, 3, 5, 8, 13); + + // construct and submit our basic workflow + Workflow flow = + WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource, + TaskConfig.COMMAND_CONFIG, String.valueOf(100), TaskConfig.MAX_ATTEMPTS_PER_PARTITION, + String.valueOf(1), TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions)) + .build(); + _driver.start(flow); + + // wait for task completeness/timeout + TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED); + + // see if resulting context completed successfully for our partition set + String namespacedName = TaskUtil.getNamespacedTaskName(taskResource); + + TaskContext ctx = TaskUtil.getTaskContext(_manager, namespacedName); + WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_manager, taskResource); + Assert.assertNotNull(ctx); + Assert.assertNotNull(workflowContext); + Assert.assertEquals(workflowContext.getTaskState(namespacedName), TaskState.COMPLETED); + for (int i : targetPartitions) { + Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED); + Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1); + } + } + + @Test + public void testRepeatedWorkflow() throws Exception { + String workflowName = "SomeWorkflow"; + Workflow flow = + WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflowName).build(); + new TaskDriver(_manager).start(flow); + + // Wait until the task completes + TestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED); + + // Assert completion for all tasks within two minutes + for (String task : flow.getTaskConfigs().keySet()) { + TestUtil.pollForTaskState(_manager, workflowName, task, TaskState.COMPLETED); + } + } + + @Test + public void timeouts() throws Exception { + final String taskResource = "timeouts"; + Workflow flow = + WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource, + TaskConfig.MAX_ATTEMPTS_PER_PARTITION, String.valueOf(2), + TaskConfig.TIMEOUT_PER_PARTITION, String.valueOf(100)).build(); + _driver.start(flow); + + // Wait until the task reports failure. + TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.FAILED); + + // Check that all partitions timed out up to maxAttempts + TaskContext ctx = + TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource)); + int maxAttempts = 0; + for (int i = 0; i < NUM_PARTITIONS; i++) { + TaskPartitionState state = ctx.getPartitionState(i); + if (state != null) { + Assert.assertEquals(state, TaskPartitionState.TIMED_OUT); + maxAttempts = Math.max(maxAttempts, ctx.getPartitionNumAttempts(i)); + } + } + Assert.assertEquals(maxAttempts, 2); + } + + private static class ReindexTask implements Task { + private final long _delay; + private volatile boolean _canceled; + + public ReindexTask(String cfg) { + _delay = Long.parseLong(cfg); + } + + @Override + public TaskResult run() { + long expiry = System.currentTimeMillis() + _delay; + long timeLeft; + while (System.currentTimeMillis() < expiry) { + if (_canceled) { + timeLeft = expiry - System.currentTimeMillis(); + return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0 + : timeLeft)); + } + sleep(50); + } + timeLeft = expiry - System.currentTimeMillis(); + return new TaskResult(TaskResult.Status.COMPLETED, + String.valueOf(timeLeft < 0 ? 0 : timeLeft)); + } + + @Override + public void cancel() { + _canceled = true; + } + + private static void sleep(long d) { + try { + Thread.sleep(d); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/1e017f16/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java new file mode 100644 index 0000000..cd260ee --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java @@ -0,0 +1,212 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.Map; + +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.integration.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskResult; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.Workflow; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase { + private static final Logger LOG = Logger.getLogger(TestTaskRebalancerStopResume.class); + private static final int n = 5; + private static final int START_PORT = 12918; + private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave"; + private static final String TGT_DB = "TestDB"; + private static final String TASK_RESOURCE = "SomeTask"; + private static final int NUM_PARTITIONS = 20; + private static final int NUM_REPLICAS = 3; + private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName(); + private final MockParticipantManager[] _participants = new MockParticipantManager[n]; + private ClusterControllerManager _controller; + + private HelixManager _manager; + private TaskDriver _driver; + + @BeforeClass + public void beforeClass() throws Exception { + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + setupTool.addCluster(CLUSTER_NAME, true); + for (int i = 0; i < n; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + + // Set up target db + setupTool.addResourceToCluster(CLUSTER_NAME, TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL); + setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS); + + Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>(); + taskFactoryReg.put("Reindex", new TaskFactory() { + @Override + public Task createNewTask(String config) { + return new ReindexTask(config); + } + }); + + // start dummy participants + for (int i = 0; i < n; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i], + taskFactoryReg)); + + _participants[i].syncStart(); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // create cluster manager + _manager = + HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, + ZK_ADDR); + _manager.connect(); + + _driver = new TaskDriver(_manager); + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier( + ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + + result = + ClusterStateVerifier + .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + CLUSTER_NAME)); + Assert.assertTrue(result); + } + + @AfterClass + public void afterClass() throws Exception { + _controller.syncStop(); + for (int i = 0; i < n; i++) { + _participants[i].syncStop(); + } + _manager.disconnect(); + } + + @Test + public void stopAndResume() throws Exception { + Workflow flow = + WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(TASK_RESOURCE, + TaskConfig.COMMAND_CONFIG, String.valueOf(100)).build(); + + LOG.info("Starting flow " + flow.getName()); + _driver.start(flow); + TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.IN_PROGRESS); + + LOG.info("Pausing task"); + _driver.stop(TASK_RESOURCE); + TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.STOPPED); + + LOG.info("Resuming task"); + _driver.resume(TASK_RESOURCE); + TestUtil.pollForWorkflowState(_manager, TASK_RESOURCE, TaskState.COMPLETED); + } + + @Test + public void stopAndResumeWorkflow() throws Exception { + String workflow = "SomeWorkflow"; + Workflow flow = WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflow).build(); + + LOG.info("Starting flow " + workflow); + _driver.start(flow); + TestUtil.pollForWorkflowState(_manager, workflow, TaskState.IN_PROGRESS); + + LOG.info("Pausing workflow"); + _driver.stop(workflow); + TestUtil.pollForWorkflowState(_manager, workflow, TaskState.STOPPED); + + LOG.info("Resuming workflow"); + _driver.resume(workflow); + TestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED); + } + + public static class ReindexTask implements Task { + private final long _delay; + private volatile boolean _canceled; + + public ReindexTask(String cfg) { + _delay = Long.parseLong(cfg); + } + + @Override + public TaskResult run() { + long expiry = System.currentTimeMillis() + _delay; + long timeLeft; + while (System.currentTimeMillis() < expiry) { + if (_canceled) { + timeLeft = expiry - System.currentTimeMillis(); + return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0 + : timeLeft)); + } + sleep(50); + } + timeLeft = expiry - System.currentTimeMillis(); + return new TaskResult(TaskResult.Status.COMPLETED, + String.valueOf(timeLeft < 0 ? 0 : timeLeft)); + } + + @Override + public void cancel() { + _canceled = true; + } + + private static void sleep(long d) { + try { + Thread.sleep(d); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/1e017f16/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java new file mode 100644 index 0000000..470d59d --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestUtil.java @@ -0,0 +1,69 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixManager; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.WorkflowContext; +import org.testng.Assert; + +/** + * Static test utility methods. + */ +public class TestUtil { + /** + * Polls {@link org.apache.helix.task.TaskContext} for given task resource until a timeout is + * reached. + * If the task has not reached target state by then, an error is thrown + * @param workflowResource Resource to poll for completeness + * @throws InterruptedException + */ + public static void pollForWorkflowState(HelixManager manager, String workflowResource, + TaskState state) throws InterruptedException { + // Wait for completion. + long st = System.currentTimeMillis(); + WorkflowContext ctx; + do { + Thread.sleep(100); + ctx = TaskUtil.getWorkflowContext(manager, workflowResource); + } while ((ctx == null || ctx.getWorkflowState() == null || ctx.getWorkflowState() != state) + && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */); + + Assert.assertNotNull(ctx); + Assert.assertEquals(ctx.getWorkflowState(), state); + } + + public static void pollForTaskState(HelixManager manager, String workflowResource, + String taskName, TaskState state) throws InterruptedException { + // Wait for completion. + long st = System.currentTimeMillis(); + WorkflowContext ctx; + do { + Thread.sleep(100); + ctx = TaskUtil.getWorkflowContext(manager, workflowResource); + } while ((ctx == null || ctx.getTaskState(taskName) == null || ctx.getTaskState(taskName) != state) + && System.currentTimeMillis() < st + 2 * 60 * 1000 /* 2 mins */); + + Assert.assertNotNull(ctx); + Assert.assertEquals(ctx.getWorkflowState(), state); + } + +} http://git-wip-us.apache.org/repos/asf/helix/blob/1e017f16/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java new file mode 100644 index 0000000..478e944 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/WorkflowGenerator.java @@ -0,0 +1,66 @@ +package org.apache.helix.integration.task; + +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.helix.task.Workflow; + +/** + * Convenience class for generating various test workflows + */ +public class WorkflowGenerator { + public static final String DEFAULT_TGT_DB = "TestDB"; + private static final String TASK_NAME_1 = "SomeTask1"; + private static final String TASK_NAME_2 = "SomeTask2"; + + private static final Map<String, String> DEFAULT_TASK_CONFIG; + static { + Map<String, String> tmpMap = new TreeMap<String, String>(); + tmpMap.put("TargetResource", DEFAULT_TGT_DB); + tmpMap.put("TargetPartitionStates", "MASTER"); + tmpMap.put("Command", "Reindex"); + tmpMap.put("CommandConfig", String.valueOf(2000)); + tmpMap.put("TimeoutPerPartition", String.valueOf(10 * 1000)); + DEFAULT_TASK_CONFIG = Collections.unmodifiableMap(tmpMap); + } + + public static Workflow.Builder generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs( + String taskName, String... cfgs) { + if (cfgs.length % 2 != 0) { + throw new IllegalArgumentException( + "Additional configs should have even number of keys and values"); + } + Workflow.Builder bldr = generateDefaultSingleTaskWorkflowBuilder(taskName); + for (int i = 0; i < cfgs.length; i += 2) { + bldr.addConfig(taskName, cfgs[i], cfgs[i + 1]); + } + + return bldr; + } + + public static Workflow.Builder generateDefaultSingleTaskWorkflowBuilder(String taskName) { + return generateSingleTaskWorkflowBuilder(taskName, DEFAULT_TASK_CONFIG); + } + + public static Workflow.Builder generateSingleTaskWorkflowBuilder(String taskName, + Map<String, String> config) { + Workflow.Builder builder = new Workflow.Builder(taskName); + for (String key : config.keySet()) { + builder.addConfig(taskName, key, config.get(key)); + } + return builder; + } + + public static Workflow.Builder generateDefaultRepeatedTaskWorkflowBuilder(String workflowName) { + Workflow.Builder builder = new Workflow.Builder(workflowName); + builder.addParentChildDependency(TASK_NAME_1, TASK_NAME_2); + + for (String key : DEFAULT_TASK_CONFIG.keySet()) { + builder.addConfig(TASK_NAME_1, key, DEFAULT_TASK_CONFIG.get(key)); + builder.addConfig(TASK_NAME_2, key, DEFAULT_TASK_CONFIG.get(key)); + } + + return builder; + } +}
