Repository: helix Updated Branches: refs/heads/helix-0.6.x 21f09efa0 -> 923e714ec
[HELIX-440] One-time scheduling for task framework Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2d9f6c28 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2d9f6c28 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2d9f6c28 Branch: refs/heads/helix-0.6.x Commit: 2d9f6c28287ffe4529e90bcdc6c823122078d58f Parents: 3d80422 Author: Kanak Biscuitwala <[email protected]> Authored: Thu Jun 5 09:37:31 2014 -0700 Committer: Kanak Biscuitwala <[email protected]> Committed: Mon Jun 9 17:15:51 2014 -0700 ---------------------------------------------------------------------- .../org/apache/helix/task/ScheduleConfig.java | 165 +++++++++++++++++++ .../org/apache/helix/task/TaskRebalancer.java | 74 +++++++++ .../java/org/apache/helix/task/TaskUtil.java | 12 ++ .../java/org/apache/helix/task/Workflow.java | 33 ++++ .../org/apache/helix/task/WorkflowConfig.java | 55 ++++++- .../apache/helix/task/beans/ScheduleBean.java | 32 ++++ .../apache/helix/task/beans/WorkflowBean.java | 1 + .../task/TestIndependentTaskRebalancer.java | 34 ++++ 8 files changed, 404 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/2d9f6c28/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java new file mode 100644 index 0000000..9e3801e --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java @@ -0,0 +1,165 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +import org.apache.helix.task.beans.ScheduleBean; +import org.apache.log4j.Logger; + +/** + * Configuration for scheduling both one-time and recurring workflows in Helix + */ +public class ScheduleConfig { + private static final Logger LOG = Logger.getLogger(ScheduleConfig.class); + + /** Enforce that a workflow can recur at most once per minute */ + private static final long MIN_RECURRENCE_MILLIS = 60 * 1000; + + private final Date _startTime; + private final TimeUnit _recurUnit; + private final Long _recurInterval; + + private ScheduleConfig(Date startTime, TimeUnit recurUnit, Long recurInterval) { + _startTime = startTime; + _recurUnit = recurUnit; + _recurInterval = recurInterval; + } + + /** + * When the workflow should be started + * @return Date object representing the start time + */ + public Date getStartTime() { + return _startTime; + } + + /** + * The unit of the recurrence interval if this is a recurring workflow + * @return the recurrence interval unit, or null if this workflow is a one-time workflow + */ + public TimeUnit getRecurrenceUnit() { + return _recurUnit; + } + + /** + * The magnitude of the recurrence interval if this is a recurring task + * @return the recurrence interval magnitude, or null if this workflow is a one-time workflow + */ + public Long getRecurrenceInterval() { + return _recurInterval; + } + + /** + * Check if this workflow is recurring + * @return true if recurring, false if one-time + */ + public boolean isRecurring() { + return _recurUnit != null && _recurInterval != null; + } + + /** + * Check if the configured schedule is valid given these constraints: + * <ul> + * <li>All workflows must have a start time</li> + * <li>Recurrence unit and interval must both be present if either is present</li> + * <li>Recurring workflows must have a positive interval magnitude</li> + * <li>Intervals must be at least one minute</li> + * </ul> + * @return true if valid, false if invalid + */ + public boolean isValid() { + // For now, disallow recurring workflows + if (isRecurring()) { + LOG.error("Recurring workflows are not currently supported."); + return false; + } + + // All schedules must have a start time even if they are recurring + if (_startTime == null) { + LOG.error("All schedules must have a start time!"); + return false; + } + + // Recurrence properties must both either be present or absent + if ((_recurUnit == null && _recurInterval != null) + || (_recurUnit != null && _recurInterval == null)) { + LOG.error("Recurrence interval and unit must either both be present or both be absent"); + return false; + } + + // Only positive recurrence intervals are allowed if present + if (_recurInterval != null && _recurInterval <= 0) { + LOG.error("Recurrence interval must be positive"); + return false; + } + + // Enforce minimum interval length + if (_recurUnit != null) { + long converted = _recurUnit.toMillis(_recurInterval); + if (converted < MIN_RECURRENCE_MILLIS) { + LOG.error("Recurrence must be at least " + MIN_RECURRENCE_MILLIS + " ms"); + return false; + } + } + return true; + } + + /** + * Create this configuration from a serialized bean + * @param bean flat configuration of the schedule + * @return instantiated ScheduleConfig + */ + public static ScheduleConfig from(ScheduleBean bean) { + return new ScheduleConfig(bean.startTime, bean.recurUnit, bean.recurInterval); + } + + /** + * Create a schedule for a workflow that runs once at a specified time + * @param startTime the time to start the workflow + * @return instantiated ScheduleConfig + */ + public static ScheduleConfig oneTimeDelayedStart(Date startTime) { + return new ScheduleConfig(startTime, null, null); + } + + /* + * Create a schedule for a recurring workflow that should start immediately + * @param recurUnit the unit of the recurrence interval + * @param recurInterval the magnitude of the recurrence interval + * @return instantiated ScheduleConfig + * public static ScheduleConfig recurringFromNow(TimeUnit recurUnit, long recurInterval) { + * return new ScheduleConfig(new Date(), recurUnit, recurInterval); + * } + */ + + /* + * Create a schedule for a recurring workflow that should start at a specific time + * @param startTime the time to start the workflow the first time + * @param recurUnit the unit of the recurrence interval + * @param recurInterval the magnitude of the recurrence interval + * @return instantiated ScheduleConfig + * public static ScheduleConfig recurringFromDate(Date startTime, TimeUnit recurUnit, + * long recurInterval) { + * return new ScheduleConfig(startTime, recurUnit, recurInterval); + * } + */ +} http://git-wip-us.apache.org/repos/asf/helix/blob/2d9f6c28/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index a6244c8..dc0eb33 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -21,6 +21,7 @@ package org.apache.helix.task; import java.util.ArrayList; import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -29,6 +30,9 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; @@ -46,6 +50,8 @@ import org.apache.helix.model.ResourceAssignment; import org.apache.log4j.Logger; import com.google.common.base.Joiner; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; @@ -54,6 +60,13 @@ import com.google.common.collect.Sets; */ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { private static final Logger LOG = Logger.getLogger(TaskRebalancer.class); + + /** Management of already-scheduled workflows across jobs */ + private static final BiMap<String, Date> SCHEDULED_WORKFLOWS = HashBiMap.create(); + private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors + .newSingleThreadScheduledExecutor(); + + /** For connection management */ private HelixManager _manager; /** @@ -105,6 +118,12 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource); WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource); + // Check for readiness, and stop processing if it's not ready + boolean isReady = scheduleIfNotReady(workflowCfg, workflowResource, resourceName); + if (!isReady) { + return emptyAssignment(resourceName); + } + // Initialize workflow context if needed if (workflowCtx == null) { workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext")); @@ -404,6 +423,43 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { } /** + * Check if a workflow is ready to schedule, and schedule a rebalance if it is not + * @param workflowCfg the workflow to check + * @param workflowResource the Helix resource associated with the workflow + * @param jobResource a job from the workflow + * @return true if ready, false if not ready + */ + private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, String workflowResource, + String jobResource) { + // Ignore non-scheduled workflows + if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) { + return true; + } + + // Figure out when this should be run, and if it's ready, then just run it + ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig(); + Date startTime = scheduleConfig.getStartTime(); + long delay = startTime.getTime() - new Date().getTime(); + if (delay <= 0) { + SCHEDULED_WORKFLOWS.remove(workflowResource); + SCHEDULED_WORKFLOWS.inverse().remove(startTime); + return true; + } + + // No need to schedule the same runnable at the same time + if (SCHEDULED_WORKFLOWS.containsKey(workflowResource) + || SCHEDULED_WORKFLOWS.inverse().containsKey(startTime)) { + return false; + } + + // For workflows not yet scheduled, schedule them and record it + RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource); + SCHEDULED_WORKFLOWS.put(workflowResource, startTime); + SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delay, TimeUnit.MILLISECONDS); + return false; + } + + /** * Checks if the job has completed. * @param ctx The rebalancer context. * @param allPartitions The set of partitions to check. @@ -649,4 +705,22 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // so this part can just be a no-op. return currentIdealState; } + + /** + * The simplest possible runnable that will trigger a run of the controller pipeline + */ + private static class RebalanceInvoker implements Runnable { + private final HelixManager _manager; + private final String _resource; + + public RebalanceInvoker(HelixManager manager, String resource) { + _manager = manager; + _resource = resource; + } + + @Override + public void run() { + TaskUtil.invokeRebalance(_manager, _resource); + } + } } http://git-wip-us.apache.org/repos/asf/helix/blob/2d9f6c28/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index a5c97ac..a5fc026 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -34,6 +34,7 @@ import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; import org.apache.helix.model.CurrentState; import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.IdealState; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.log4j.Logger; @@ -180,6 +181,17 @@ public class TaskUtil { return Collections.emptyMap(); } + /** + * Trigger a controller pipeline execution for a given resource. + * @param manager Helix connection + * @param resource the name of the resource changed to triggering the execution + */ + public static void invokeRebalance(HelixManager manager, String resource) { + // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + accessor.updateProperty(accessor.keyBuilder().idealStates(resource), new IdealState(resource)); + } + private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) { HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource); ConfigAccessor configAccessor = manager.getConfigAccessor(); http://git-wip-us.apache.org/repos/asf/helix/blob/2d9f6c28/helix-core/src/main/java/org/apache/helix/task/Workflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java index 57404d8..7e54347 100644 --- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java +++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java @@ -26,6 +26,7 @@ import java.io.Reader; import java.io.StringReader; import java.util.ArrayList; import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -81,12 +82,31 @@ public class Workflow { return _taskConfigs; } + public WorkflowConfig getWorkflowConfig() { + return _workflowConfig; + } + public Map<String, String> getResourceConfigMap() throws Exception { Map<String, String> cfgMap = new HashMap<String, String>(); cfgMap.put(WorkflowConfig.DAG, _workflowConfig.getJobDag().toJson()); cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(_workflowConfig.getExpiry())); cfgMap.put(WorkflowConfig.TARGET_STATE, _workflowConfig.getTargetState().name()); + // Populate schedule if present + ScheduleConfig scheduleConfig = _workflowConfig.getScheduleConfig(); + if (scheduleConfig != null) { + Date startTime = scheduleConfig.getStartTime(); + if (startTime != null) { + String formattedTime = WorkflowConfig.DEFAULT_DATE_FORMAT.format(startTime); + cfgMap.put(WorkflowConfig.START_TIME, formattedTime); + } + if (scheduleConfig.isRecurring()) { + cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, scheduleConfig.getRecurrenceUnit().toString()); + cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, scheduleConfig.getRecurrenceInterval() + .toString()); + } + } + return cfgMap; } @@ -182,6 +202,10 @@ public class Workflow { } } + if (wf.schedule != null) { + builder.setScheduleConfig(ScheduleConfig.from(wf.schedule)); + } + return builder.build(); } @@ -219,6 +243,7 @@ public class Workflow { private JobDag _dag; private Map<String, Map<String, String>> _jobConfigs; private Map<String, List<TaskConfig>> _taskConfigs; + private ScheduleConfig _scheduleConfig; private long _expiry; public Builder(String name) { @@ -275,6 +300,11 @@ public class Workflow { return this; } + public Builder setScheduleConfig(ScheduleConfig scheduleConfig) { + _scheduleConfig = scheduleConfig; + return this; + } + public Builder setExpiry(long expiry) { _expiry = expiry; return this; @@ -293,6 +323,9 @@ public class Workflow { WorkflowConfig.Builder builder = new WorkflowConfig.Builder(); builder.setTaskDag(_dag); builder.setTargetState(TargetState.START); + if (_scheduleConfig != null) { + builder.setScheduleConfig(_scheduleConfig); + } if (_expiry > 0) { builder.setExpiry(_expiry); } http://git-wip-us.apache.org/repos/asf/helix/blob/2d9f6c28/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java index 6f10955..da404e5 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java @@ -19,29 +19,48 @@ package org.apache.helix.task; * under the License. */ +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Map; +import java.util.TimeZone; + +import org.apache.log4j.Logger; /** * Provides a typed interface to workflow level configurations. Validates the configurations. */ public class WorkflowConfig { + private static final Logger LOG = Logger.getLogger(WorkflowConfig.class); + /* Config fields */ public static final String DAG = "Dag"; public static final String TARGET_STATE = "TargetState"; public static final String EXPIRY = "Expiry"; + public static final String START_TIME = "StartTime"; + public static final String RECURRENCE_UNIT = "RecurrenceUnit"; + public static final String RECURRENCE_INTERVAL = "RecurrenceInterval"; /* Default values */ public static final long DEFAULT_EXPIRY = 24 * 60 * 60 * 1000; + public static final SimpleDateFormat DEFAULT_DATE_FORMAT = new SimpleDateFormat( + "MM-dd-yyyy HH:mm:ss"); + static { + DEFAULT_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC")); + } /* Member variables */ private JobDag _jobDag; private TargetState _targetState; private long _expiry; + private ScheduleConfig _scheduleConfig; - private WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry) { + private WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry, + ScheduleConfig scheduleConfig) { _jobDag = jobDag; _targetState = targetState; _expiry = expiry; + _scheduleConfig = scheduleConfig; } public JobDag getJobDag() { @@ -56,10 +75,15 @@ public class WorkflowConfig { return _expiry; } + public ScheduleConfig getScheduleConfig() { + return _scheduleConfig; + } + public static class Builder { private JobDag _taskDag = JobDag.EMPTY_DAG; private TargetState _targetState = TargetState.START; private long _expiry = DEFAULT_EXPIRY; + private ScheduleConfig _scheduleConfig; public Builder() { // Nothing to do @@ -68,7 +92,7 @@ public class WorkflowConfig { public WorkflowConfig build() { validate(); - return new WorkflowConfig(_taskDag, _targetState, _expiry); + return new WorkflowConfig(_taskDag, _targetState, _expiry, _scheduleConfig); } public Builder setTaskDag(JobDag v) { @@ -86,6 +110,11 @@ public class WorkflowConfig { return this; } + public Builder setScheduleConfig(ScheduleConfig scheduleConfig) { + _scheduleConfig = scheduleConfig; + return this; + } + public static Builder fromMap(Map<String, String> cfg) { Builder b = new Builder(); @@ -99,6 +128,24 @@ public class WorkflowConfig { b.setTargetState(TargetState.valueOf(cfg.get(TARGET_STATE))); } + // Parse schedule-specific configs, if they exist + Date startTime = null; + if (cfg.containsKey(START_TIME)) { + try { + startTime = DEFAULT_DATE_FORMAT.parse(cfg.get(START_TIME)); + } catch (ParseException e) { + LOG.error("Unparseable date " + cfg.get(START_TIME), e); + } + } + if (cfg.containsKey(RECURRENCE_UNIT) && cfg.containsKey(RECURRENCE_INTERVAL)) { + /* + * b.setScheduleConfig(ScheduleConfig.recurringFromDate(startTime, + * TimeUnit.valueOf(cfg.get(RECURRENCE_UNIT)), + * Long.parseLong(cfg.get(RECURRENCE_INTERVAL)))); + */ + } else if (startTime != null) { + b.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(startTime)); + } return b; } @@ -106,6 +153,10 @@ public class WorkflowConfig { if (_expiry < 0) { throw new IllegalArgumentException( String.format("%s has invalid value %s", EXPIRY, _expiry)); + } else if (_scheduleConfig != null && !_scheduleConfig.isValid()) { + throw new IllegalArgumentException( + "Scheduler configuration is invalid. The configuration must have a start time if it is " + + "one-time, and it must have a positive interval magnitude if it is recurring"); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/2d9f6c28/helix-core/src/main/java/org/apache/helix/task/beans/ScheduleBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/ScheduleBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/ScheduleBean.java new file mode 100644 index 0000000..9e843f5 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/task/beans/ScheduleBean.java @@ -0,0 +1,32 @@ +package org.apache.helix.task.beans; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +/** + * A bean representing how a workflow can be scheduled in Helix + */ +public class ScheduleBean { + public Date startTime; + public Long recurInterval; + public TimeUnit recurUnit; +} http://git-wip-us.apache.org/repos/asf/helix/blob/2d9f6c28/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java index 76da4c8..2ea23c7 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java @@ -28,4 +28,5 @@ public class WorkflowBean { public String name; public String expiry; public List<JobBean> jobs; + public ScheduleBean schedule; } http://git-wip-us.apache.org/repos/asf/helix/blob/2d9f6c28/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java index 10f0ac7..b5856b1 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java @@ -19,6 +19,7 @@ package org.apache.helix.integration.task; * under the License. */ +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,6 +35,7 @@ import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.integration.task.TestTaskRebalancerStopResume.ReindexTask; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.task.JobConfig; +import org.apache.helix.task.ScheduleConfig; import org.apache.helix.task.Task; import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskConfig; @@ -43,7 +45,9 @@ import org.apache.helix.task.TaskResult; import org.apache.helix.task.TaskResult.Status; import org.apache.helix.task.TaskState; import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.TaskUtil; import org.apache.helix.task.Workflow; +import org.apache.helix.task.WorkflowContext; import org.apache.helix.tools.ClusterSetup; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -245,6 +249,36 @@ public class TestIndependentTaskRebalancer extends ZkIntegrationTestBase { Assert.assertTrue(_runCounts.values().contains(1)); } + @Test + public void testOneTimeScheduled() throws Exception { + String jobName = TestHelper.getTestMethodName(); + Workflow.Builder workflowBuilder = new Workflow.Builder(jobName); + List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1); + Map<String, String> taskConfigMap = Maps.newHashMap(); + TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false); + taskConfigs.add(taskConfig1); + workflowBuilder.addTaskConfigs(jobName, taskConfigs); + workflowBuilder.addConfig(jobName, JobConfig.COMMAND, "DummyCommand"); + Map<String, String> jobConfigMap = Maps.newHashMap(); + jobConfigMap.put("Timeout", "1000"); + workflowBuilder.addJobConfigMap(jobName, jobConfigMap); + long inFiveSeconds = System.currentTimeMillis() + (5 * 1000); + workflowBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(new Date(inFiveSeconds))); + _driver.start(workflowBuilder.build()); + + // Ensure the job completes + TestUtil.pollForWorkflowState(_manager, jobName, TaskState.IN_PROGRESS); + TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED); + + // Ensure that the class was invoked + Assert.assertTrue(_invokedClasses.contains(TaskOne.class.getName())); + + // Check that the workflow only started after the start time (with a 1 second buffer) + WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, jobName); + long startTime = workflowCtx.getStartTime(); + Assert.assertTrue((startTime + 1000) >= inFiveSeconds); + } + private class TaskOne extends ReindexTask { private final boolean _shouldFail; private final String _instanceName;
