Repository: helix Updated Branches: refs/heads/helix-0.6.x 0474ee0ee -> 05440cbea
[HELIX-416] Support recurring scheduled tasks Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/991d1a3e Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/991d1a3e Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/991d1a3e Branch: refs/heads/helix-0.6.x Commit: 991d1a3ebece8bf26c9557437b975bf1d9133b28 Parents: 923e714 Author: Kanak Biscuitwala <[email protected]> Authored: Mon Jun 23 13:59:59 2014 -0700 Committer: Kanak Biscuitwala <[email protected]> Committed: Mon Jun 23 13:59:59 2014 -0700 ---------------------------------------------------------------------- .../stages/ResourceComputationStage.java | 11 +- .../org/apache/helix/task/ScheduleConfig.java | 29 ++- .../org/apache/helix/task/TaskConstants.java | 4 + .../java/org/apache/helix/task/TaskDriver.java | 36 +++- .../org/apache/helix/task/TaskRebalancer.java | 131 +++++++++--- .../java/org/apache/helix/task/TaskUtil.java | 211 +++++++++++++++++++ .../java/org/apache/helix/task/Workflow.java | 4 +- .../org/apache/helix/task/WorkflowConfig.java | 38 +--- .../org/apache/helix/task/WorkflowContext.java | 9 + .../apache/helix/task/beans/WorkflowBean.java | 4 +- 10 files changed, 396 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/991d1a3e/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java index c38462c..5676098 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java @@ -55,15 +55,18 @@ public class ResourceComputationStage extends AbstractBaseStage { for (IdealState idealState : idealStates.values()) { Set<String> partitionSet = idealState.getPartitionSet(); String resourceName = idealState.getResourceName(); - - for (String partition : partitionSet) { - addPartition(partition, resourceName, resourceMap); - Resource resource = resourceMap.get(resourceName); + if (!resourceMap.containsKey(resourceName)) { + Resource resource = new Resource(resourceName); + resourceMap.put(resourceName, resource); resource.setStateModelDefRef(idealState.getStateModelDefRef()); resource.setStateModelFactoryName(idealState.getStateModelFactoryName()); resource.setBucketSize(idealState.getBucketSize()); resource.setBatchMessageMode(idealState.getBatchMessageMode()); } + + for (String partition : partitionSet) { + addPartition(partition, resourceName, resourceMap); + } } } http://git-wip-us.apache.org/repos/asf/helix/blob/991d1a3e/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java index 9e3801e..b123793 100644 --- a/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/ScheduleConfig.java @@ -87,12 +87,6 @@ public class ScheduleConfig { * @return true if valid, false if invalid */ public boolean isValid() { - // For now, disallow recurring workflows - if (isRecurring()) { - LOG.error("Recurring workflows are not currently supported."); - return false; - } - // All schedules must have a start time even if they are recurring if (_startTime == null) { LOG.error("All schedules must have a start time!"); @@ -141,25 +135,28 @@ public class ScheduleConfig { return new ScheduleConfig(startTime, null, null); } - /* + /** * Create a schedule for a recurring workflow that should start immediately * @param recurUnit the unit of the recurrence interval * @param recurInterval the magnitude of the recurrence interval * @return instantiated ScheduleConfig - * public static ScheduleConfig recurringFromNow(TimeUnit recurUnit, long recurInterval) { - * return new ScheduleConfig(new Date(), recurUnit, recurInterval); - * } */ + public static ScheduleConfig recurringFromNow(TimeUnit recurUnit, long recurInterval) { + return new ScheduleConfig(new Date(), recurUnit, recurInterval); + } - /* + /** * Create a schedule for a recurring workflow that should start at a specific time - * @param startTime the time to start the workflow the first time + * @param startTime the time to start the workflow the first time, or null if now * @param recurUnit the unit of the recurrence interval * @param recurInterval the magnitude of the recurrence interval * @return instantiated ScheduleConfig - * public static ScheduleConfig recurringFromDate(Date startTime, TimeUnit recurUnit, - * long recurInterval) { - * return new ScheduleConfig(startTime, recurUnit, recurInterval); - * } */ + public static ScheduleConfig recurringFromDate(Date startTime, TimeUnit recurUnit, + long recurInterval) { + if (startTime == null) { + startTime = new Date(); + } + return new ScheduleConfig(startTime, recurUnit, recurInterval); + } } http://git-wip-us.apache.org/repos/asf/helix/blob/991d1a3e/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java index 305323d..34008d6 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskConstants.java @@ -39,4 +39,8 @@ public class TaskConstants { * The root property store path at which the {@link TaskRebalancer} stores context information. */ public static final String REBALANCER_CONTEXT_ROOT = "/TaskRebalancer"; + /** + * Resource prefix for scheduled workflows + */ + public static final String SCHEDULED = "SCHEDULED"; } http://git-wip-us.apache.org/repos/asf/helix/blob/991d1a3e/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index 1ec6848..d7e8e25 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import org.I0Itec.zkclient.DataUpdater; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.GnuParser; @@ -35,6 +36,7 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.helix.AccessOption; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; @@ -42,6 +44,7 @@ import org.apache.helix.HelixManagerFactory; import org.apache.helix.HelixProperty; import org.apache.helix.InstanceType; import org.apache.helix.PropertyKey; +import org.apache.helix.ZNRecord; import org.apache.helix.model.IdealState; import org.apache.helix.model.builder.CustomModeISBuilder; import org.apache.log4j.Logger; @@ -221,11 +224,38 @@ public class TaskDriver { /** Helper function to change target state for a given task */ private void setTaskTargetState(String jobResource, TargetState state) { + setSingleTaskTargetState(jobResource, state); + + // For recurring schedules, child workflows must also be handled HelixDataAccessor accessor = _manager.getHelixDataAccessor(); - HelixProperty p = new HelixProperty(jobResource); - p.getRecord().setSimpleField(WorkflowConfig.TARGET_STATE, state.name()); - accessor.updateProperty(accessor.keyBuilder().resourceConfig(jobResource), p); + List<String> resources = accessor.getChildNames(accessor.keyBuilder().resourceConfigs()); + for (String resource : resources) { + String prefix = resource + "_" + TaskConstants.SCHEDULED; + if (resource.startsWith(prefix)) { + setSingleTaskTargetState(resource, state); + } + } + } + /** Helper function to change target state for a given task */ + private void setSingleTaskTargetState(String jobResource, final TargetState state) { + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() { + @Override + public ZNRecord update(ZNRecord currentData) { + // Only update target state for non-completed workflows + String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME); + if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) { + currentData.setSimpleField(WorkflowConfig.TARGET_STATE, state.name()); + } + return currentData; + } + }; + List<DataUpdater<ZNRecord>> updaters = Lists.newArrayList(); + updaters.add(updater); + List<String> paths = Lists.newArrayList(); + paths.add(accessor.keyBuilder().resourceConfig(jobResource).getPath()); + accessor.updateChildren(paths, updaters, AccessOption.PERSISTENT); invokeRebalance(); } http://git-wip-us.apache.org/repos/asf/helix/blob/991d1a3e/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index dc0eb33..2e4e300 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; @@ -53,6 +54,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; /** @@ -61,12 +63,12 @@ import com.google.common.collect.Sets; public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { private static final Logger LOG = Logger.getLogger(TaskRebalancer.class); - /** Management of already-scheduled workflows across jobs */ + // Management of already-scheduled workflows across jobs private static final BiMap<String, Date> SCHEDULED_WORKFLOWS = HashBiMap.create(); private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors .newSingleThreadScheduledExecutor(); - /** For connection management */ + // For connection management private HelixManager _manager; /** @@ -118,12 +120,6 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { WorkflowConfig workflowCfg = TaskUtil.getWorkflowCfg(_manager, workflowResource); WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource); - // Check for readiness, and stop processing if it's not ready - boolean isReady = scheduleIfNotReady(workflowCfg, workflowResource, resourceName); - if (!isReady) { - return emptyAssignment(resourceName); - } - // Initialize workflow context if needed if (workflowCtx == null) { workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext")); @@ -134,7 +130,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { for (String parent : workflowCfg.getJobDag().getDirectParents(resourceName)) { if (workflowCtx.getJobState(parent) == null || !workflowCtx.getJobState(parent).equals(TaskState.COMPLETED)) { - return emptyAssignment(resourceName); + return emptyAssignment(resourceName, currStateOutput); } } @@ -142,7 +138,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { TargetState targetState = workflowCfg.getTargetState(); if (targetState == TargetState.DELETE) { cleanup(_manager, resourceName, workflowCfg, workflowResource); - return emptyAssignment(resourceName); + return emptyAssignment(resourceName, currStateOutput); } // Check if this workflow has been finished past its expiry. @@ -150,7 +146,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) { markForDeletion(_manager, workflowResource); cleanup(_manager, resourceName, workflowCfg, workflowResource); - return emptyAssignment(resourceName); + return emptyAssignment(resourceName, currStateOutput); } // Fetch any existing context information from the property store. @@ -163,9 +159,17 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // The job is already in a final state (completed/failed). if (workflowCtx.getJobState(resourceName) == TaskState.FAILED || workflowCtx.getJobState(resourceName) == TaskState.COMPLETED) { - return emptyAssignment(resourceName); + return emptyAssignment(resourceName, currStateOutput); + } + + // Check for readiness, and stop processing if it's not ready + boolean isReady = + scheduleIfNotReady(workflowCfg, workflowCtx, workflowResource, resourceName, clusterData); + if (!isReady) { + return emptyAssignment(resourceName, currStateOutput); } + // Grab the old assignment, or an empty one if it doesn't exist ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName); if (prevAssignment == null) { prevAssignment = new ResourceAssignment(resourceName); @@ -342,8 +346,9 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { if (!successOptional) { workflowCtx.setJobState(jobResource, TaskState.FAILED); workflowCtx.setWorkflowState(TaskState.FAILED); + workflowCtx.setFinishTime(System.currentTimeMillis()); addAllPartitions(allPartitions, partitionsToDropFromIs); - return emptyAssignment(jobResource); + return emptyAssignment(jobResource, currStateOutput); } else { skippedPartitions.add(pId); partitionsToDropFromIs.add(pId); @@ -425,12 +430,14 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { /** * Check if a workflow is ready to schedule, and schedule a rebalance if it is not * @param workflowCfg the workflow to check + * @param workflowCtx the current workflow context * @param workflowResource the Helix resource associated with the workflow * @param jobResource a job from the workflow + * @param cache the current snapshot of the cluster * @return true if ready, false if not ready */ - private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, String workflowResource, - String jobResource) { + private boolean scheduleIfNotReady(WorkflowConfig workflowCfg, WorkflowContext workflowCtx, + String workflowResource, String jobResource, ClusterDataCache cache) { // Ignore non-scheduled workflows if (workflowCfg == null || workflowCfg.getScheduleConfig() == null) { return true; @@ -439,11 +446,66 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { // Figure out when this should be run, and if it's ready, then just run it ScheduleConfig scheduleConfig = workflowCfg.getScheduleConfig(); Date startTime = scheduleConfig.getStartTime(); - long delay = startTime.getTime() - new Date().getTime(); - if (delay <= 0) { - SCHEDULED_WORKFLOWS.remove(workflowResource); - SCHEDULED_WORKFLOWS.inverse().remove(startTime); - return true; + long currentTime = new Date().getTime(); + long delayFromStart = startTime.getTime() - currentTime; + + if (delayFromStart <= 0) { + // Remove any timers that are past-time for this workflow + Date scheduledTime = SCHEDULED_WORKFLOWS.get(workflowResource); + if (scheduledTime != null && currentTime > scheduledTime.getTime()) { + SCHEDULED_WORKFLOWS.remove(workflowResource); + } + + // Recurring workflows are just templates that spawn new workflows + if (scheduleConfig.isRecurring()) { + // Skip scheduling this workflow if it's not in a start state + if (!workflowCfg.getTargetState().equals(TargetState.START)) { + return false; + } + + // Skip scheduling this workflow again if the previous run (if any) is still active + String lastScheduled = workflowCtx.getLastScheduledSingleWorkflow(); + if (lastScheduled != null) { + WorkflowContext lastWorkflowCtx = TaskUtil.getWorkflowContext(_manager, lastScheduled); + if (lastWorkflowCtx == null + || lastWorkflowCtx.getFinishTime() == WorkflowContext.UNFINISHED) { + return false; + } + } + + // Figure out how many jumps are needed, thus the time to schedule the next workflow + // The negative of the delay is the amount of time past the start time + long period = + scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval()); + long offsetMultiplier = (-delayFromStart) / period; + long timeToSchedule = period * offsetMultiplier + startTime.getTime(); + + // Now clone the workflow if this clone has not yet been created + String newWorkflowName = + workflowResource + "_" + TaskConstants.SCHEDULED + "_" + offsetMultiplier; + if (lastScheduled == null || !lastScheduled.equals(newWorkflowName)) { + Workflow clonedWf = + TaskUtil.cloneWorkflow(_manager, workflowResource, newWorkflowName, new Date( + timeToSchedule)); + TaskDriver driver = new TaskDriver(_manager); + try { + // Start the cloned workflow + driver.start(clonedWf); + } catch (Exception e) { + LOG.error("Failed to schedule cloned workflow " + newWorkflowName, e); + } + // Persist workflow start regardless of success to avoid retrying and failing + workflowCtx.setLastScheduledSingleWorkflow(newWorkflowName); + TaskUtil.setWorkflowContext(_manager, workflowResource, workflowCtx); + } + + // Change the time to trigger the pipeline to that of the next run + startTime = new Date(timeToSchedule + period); + delayFromStart = startTime.getTime() - System.currentTimeMillis(); + } else { + // This is a one-time workflow and is ready + return true; + } } // No need to schedule the same runnable at the same time @@ -452,11 +514,22 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { return false; } + scheduleRebalance(workflowResource, jobResource, startTime, delayFromStart); + return false; + } + + private void scheduleRebalance(String workflowResource, String jobResource, Date startTime, + long delayFromStart) { + // No need to schedule the same runnable at the same time + if (SCHEDULED_WORKFLOWS.containsKey(workflowResource) + || SCHEDULED_WORKFLOWS.inverse().containsKey(startTime)) { + return; + } + // For workflows not yet scheduled, schedule them and record it RebalanceInvoker rebalanceInvoker = new RebalanceInvoker(_manager, jobResource); SCHEDULED_WORKFLOWS.put(workflowResource, startTime); - SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delay, TimeUnit.MILLISECONDS); - return false; + SCHEDULED_EXECUTOR.schedule(rebalanceInvoker, delayFromStart, TimeUnit.MILLISECONDS); } /** @@ -602,8 +675,18 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { } } - private static ResourceAssignment emptyAssignment(String name) { - return new ResourceAssignment(name); + private static ResourceAssignment emptyAssignment(String name, CurrentStateOutput currStateOutput) { + ResourceAssignment assignment = new ResourceAssignment(name); + Set<Partition> partitions = currStateOutput.getCurrentStateMappedPartitions(name); + for (Partition partition : partitions) { + Map<String, String> currentStateMap = currStateOutput.getCurrentStateMap(name, partition); + Map<String, String> replicaMap = Maps.newHashMap(); + for (String instanceName : currentStateMap.keySet()) { + replicaMap.put(instanceName, HelixDefinedState.DROPPED.toString()); + } + assignment.addReplicaMap(partition, replicaMap); + } + return assignment; } private static void addCompletedPartitions(Set<Integer> set, JobContext ctx, http://git-wip-us.apache.org/repos/asf/helix/blob/991d1a3e/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index a5fc026..1aa75d6 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -20,10 +20,14 @@ package org.apache.helix.task; */ import java.io.IOException; +import java.text.ParseException; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.helix.AccessOption; import org.apache.helix.ConfigAccessor; @@ -42,6 +46,7 @@ import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** @@ -61,6 +66,9 @@ public class TaskUtil { */ public static JobConfig getJobCfg(HelixManager manager, String jobResource) { HelixProperty jobResourceConfig = getResourceConfig(manager, jobResource); + if (jobResourceConfig == null) { + return null; + } JobConfig.Builder b = JobConfig.Builder.fromMap(jobResourceConfig.getRecord().getSimpleFields()); Map<String, Map<String, String>> rawTaskConfigMap = @@ -74,13 +82,33 @@ public class TaskUtil { return b.build(); } + /** + * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object. + * @param manager Helix manager object used to connect to Helix. + * @param workflowResource The name of the workflow resource. + * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the + * workflow, null otherwise. + */ public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) { Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource); + if (workflowCfg == null) { + return null; + } WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg); return b.build(); } + /** + * Request a state change for a specific task. + * @param accessor connected Helix data accessor + * @param instance the instance serving the task + * @param sessionId the current session of the instance + * @param resource the job name + * @param partition the task partition name + * @param state the requested state + * @return true if the request was persisted, false otherwise + */ public static boolean setRequestedState(HelixDataAccessor accessor, String instance, String sessionId, String resource, String partition, TaskPartitionState state) { LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state, @@ -99,11 +127,23 @@ public class TaskUtil { } } + /** + * Get a Helix configuration scope at a resource (i.e. job and workflow) level + * @param clusterName the cluster containing the resource + * @param resource the resource name + * @return instantiated {@link HelixConfigScope} + */ public static HelixConfigScope getResourceConfigScope(String clusterName, String resource) { return new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE) .forCluster(clusterName).forResource(resource).build(); } + /** + * Get the last task assignment for a given job + * @param manager a connection to Helix + * @param resourceName the name of the job + * @return {@link ResourceAssignment} instance, or null if no assignment is available + */ public static ResourceAssignment getPrevResourceAssignment(HelixManager manager, String resourceName) { ZNRecord r = @@ -113,6 +153,12 @@ public class TaskUtil { return r != null ? new ResourceAssignment(r) : null; } + /** + * Set the last task assignment for a given job + * @param manager a connection to Helix + * @param resourceName the name of the job + * @param ra {@link ResourceAssignment} containing the task assignment + */ public static void setPrevResourceAssignment(HelixManager manager, String resourceName, ResourceAssignment ra) { manager.getHelixPropertyStore().set( @@ -120,6 +166,12 @@ public class TaskUtil { ra.getRecord(), AccessOption.PERSISTENT); } + /** + * Get the runtime context of a single job + * @param manager a connection to Helix + * @param jobResource the name of the job + * @return the {@link JobContext}, or null if none is available + */ public static JobContext getJobContext(HelixManager manager, String jobResource) { ZNRecord r = manager.getHelixPropertyStore().get( @@ -128,12 +180,24 @@ public class TaskUtil { return r != null ? new JobContext(r) : null; } + /** + * Set the runtime context of a single job + * @param manager a connection to Helix + * @param jobResource the name of the job + * @param ctx the up-to-date {@link JobContext} for the job + */ public static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) { manager.getHelixPropertyStore().set( Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE), ctx.getRecord(), AccessOption.PERSISTENT); } + /** + * Get the rumtime context of a single workflow + * @param manager a connection to Helix + * @param workflowResource the name of the workflow + * @return the {@link WorkflowContext}, or null if none is available + */ public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) { ZNRecord r = manager.getHelixPropertyStore().get( @@ -142,6 +206,12 @@ public class TaskUtil { return r != null ? new WorkflowContext(r) : null; } + /** + * Set the rumtime context of a single workflow + * @param manager a connection to Helix + * @param workflowResource the name of the workflow + * @param ctx the up-to-date {@link WorkflowContext} for the workflow + */ public static void setWorkflowContext(HelixManager manager, String workflowResource, WorkflowContext ctx) { manager.getHelixPropertyStore().set( @@ -149,14 +219,45 @@ public class TaskUtil { ctx.getRecord(), AccessOption.PERSISTENT); } + /** + * Get a workflow-qualified job name for a single-job workflow + * @param singleJobWorkflow the name of the single-job workflow + * @return The namespaced job name, which is just singleJobWorkflow_singleJobWorkflow + */ public static String getNamespacedJobName(String singleJobWorkflow) { return getNamespacedJobName(singleJobWorkflow, singleJobWorkflow); } + /** + * Get a workflow-qualified job name for a job in that workflow + * @param workflowResource the name of the workflow + * @param jobName the un-namespaced name of the job + * @return The namespaced job name, which is just workflowResource_jobName + */ public static String getNamespacedJobName(String workflowResource, String jobName) { return workflowResource + "_" + jobName; } + /** + * Remove the workflow namespace from the job name + * @param workflowResource the name of the workflow that owns the job + * @param jobName the namespaced job name + * @return the denamespaced job name, or the same job name if it is already denamespaced + */ + public static String getDenamespacedJobName(String workflowResource, String jobName) { + if (jobName.contains(workflowResource)) { + // skip the entire length of the work plus the underscore + return jobName.substring(jobName.indexOf(workflowResource) + workflowResource.length() + 1); + } else { + return jobName; + } + } + + /** + * Serialize a map of job-level configurations as a single string + * @param commandConfig map of job config key to config value + * @return serialized string + */ public static String serializeJobConfigMap(Map<String, String> commandConfig) { ObjectMapper mapper = new ObjectMapper(); try { @@ -168,6 +269,11 @@ public class TaskUtil { return null; } + /** + * Deserialize a single string into a map of job-level configurations + * @param commandConfig the serialized job config map + * @return a map of job config key to config value + */ public static Map<String, String> deserializeJobConfigMap(String commandConfig) { ObjectMapper mapper = new ObjectMapper(); try { @@ -192,6 +298,111 @@ public class TaskUtil { accessor.updateProperty(accessor.keyBuilder().idealStates(resource), new IdealState(resource)); } + /** + * Get a ScheduleConfig from a workflow config string map + * @param cfg the string map + * @return a ScheduleConfig if one exists, otherwise null + */ + public static ScheduleConfig parseScheduleFromConfigMap(Map<String, String> cfg) { + // Parse schedule-specific configs, if they exist + Date startTime = null; + if (cfg.containsKey(WorkflowConfig.START_TIME)) { + try { + startTime = WorkflowConfig.DEFAULT_DATE_FORMAT.parse(cfg.get(WorkflowConfig.START_TIME)); + } catch (ParseException e) { + LOG.error("Unparseable date " + cfg.get(WorkflowConfig.START_TIME), e); + return null; + } + } + if (cfg.containsKey(WorkflowConfig.RECURRENCE_UNIT) + && cfg.containsKey(WorkflowConfig.RECURRENCE_INTERVAL)) { + return ScheduleConfig.recurringFromDate(startTime, + TimeUnit.valueOf(cfg.get(WorkflowConfig.RECURRENCE_UNIT)), + Long.parseLong(cfg.get(WorkflowConfig.RECURRENCE_INTERVAL))); + } else if (startTime != null) { + return ScheduleConfig.oneTimeDelayedStart(startTime); + } + return null; + } + + /** + * Create a new workflow based on an existing one + * @param manager connection to Helix + * @param origWorkflowName the name of the existing workflow + * @param newWorkflowName the name of the new workflow + * @param newStartTime a provided start time that deviates from the desired start time + * @return the cloned workflow, or null if there was a problem cloning the existing one + */ + public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName, + String newWorkflowName, Date newStartTime) { + // Read all resources, including the workflow and jobs of interest + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + Map<String, HelixProperty> resourceConfigMap = + accessor.getChildValuesMap(keyBuilder.resourceConfigs()); + if (!resourceConfigMap.containsKey(origWorkflowName)) { + LOG.error("No such workflow named " + origWorkflowName); + return null; + } + if (resourceConfigMap.containsKey(newWorkflowName)) { + LOG.error("Workflow with name " + newWorkflowName + " already exists!"); + return null; + } + + // Create a new workflow with a new name + HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName); + Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields(); + JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG)); + Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren(); + Workflow.Builder builder = new Workflow.Builder(newWorkflowName); + + // Set the workflow expiry + builder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY))); + + // Set the schedule, if applicable + ScheduleConfig scheduleConfig; + if (newStartTime != null) { + scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime); + } else { + scheduleConfig = parseScheduleFromConfigMap(wfSimpleFields); + } + if (scheduleConfig != null) { + builder.setScheduleConfig(scheduleConfig); + } + + // Add each job back as long as the original exists + Set<String> namespacedJobs = jobDag.getAllNodes(); + for (String namespacedJob : namespacedJobs) { + if (resourceConfigMap.containsKey(namespacedJob)) { + // Copy over job-level and task-level configs + String job = getDenamespacedJobName(origWorkflowName, namespacedJob); + HelixProperty jobConfig = resourceConfigMap.get(namespacedJob); + Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields(); + jobSimpleFields.put(JobConfig.WORKFLOW_ID, newWorkflowName); // overwrite workflow name + for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) { + builder.addConfig(job, e.getKey(), e.getValue()); + } + Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields(); + List<TaskConfig> taskConfigs = Lists.newLinkedList(); + for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) { + TaskConfig taskConfig = TaskConfig.from(rawTaskConfig); + taskConfigs.add(taskConfig); + } + builder.addTaskConfigs(job, taskConfigs); + + // Add dag dependencies + Set<String> children = parentsToChildren.get(namespacedJob); + if (children != null) { + for (String namespacedChild : children) { + String child = getDenamespacedJobName(origWorkflowName, namespacedChild); + builder.addParentChildDependency(job, child); + } + } + } + } + return builder.build(); + } + private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) { HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource); ConfigAccessor configAccessor = manager.getConfigAccessor(); http://git-wip-us.apache.org/repos/asf/helix/blob/991d1a3e/helix-core/src/main/java/org/apache/helix/task/Workflow.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/Workflow.java b/helix-core/src/main/java/org/apache/helix/task/Workflow.java index 7e54347..84680d3 100644 --- a/helix-core/src/main/java/org/apache/helix/task/Workflow.java +++ b/helix-core/src/main/java/org/apache/helix/task/Workflow.java @@ -205,6 +205,7 @@ public class Workflow { if (wf.schedule != null) { builder.setScheduleConfig(ScheduleConfig.from(wf.schedule)); } + builder.setExpiry(wf.expiry); return builder.build(); } @@ -251,7 +252,7 @@ public class Workflow { _dag = new JobDag(); _jobConfigs = new TreeMap<String, Map<String, String>>(); _taskConfigs = new TreeMap<String, List<TaskConfig>>(); - _expiry = -1; + _expiry = WorkflowConfig.DEFAULT_EXPIRY; } public Builder addConfig(String job, String key, String val) { @@ -329,7 +330,6 @@ public class Workflow { if (_expiry > 0) { builder.setExpiry(_expiry); } - return new Workflow(_name, builder.build(), _jobConfigs, _taskConfigs); // calls validate // internally } http://git-wip-us.apache.org/repos/asf/helix/blob/991d1a3e/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java index da404e5..9b20c4f 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowConfig.java @@ -19,20 +19,14 @@ package org.apache.helix.task; * under the License. */ -import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.Date; import java.util.Map; import java.util.TimeZone; -import org.apache.log4j.Logger; - /** * Provides a typed interface to workflow level configurations. Validates the configurations. */ public class WorkflowConfig { - private static final Logger LOG = Logger.getLogger(WorkflowConfig.class); - /* Config fields */ public static final String DAG = "Dag"; public static final String TARGET_STATE = "TargetState"; @@ -50,10 +44,10 @@ public class WorkflowConfig { } /* Member variables */ - private JobDag _jobDag; - private TargetState _targetState; - private long _expiry; - private ScheduleConfig _scheduleConfig; + private final JobDag _jobDag; + private final TargetState _targetState; + private final long _expiry; + private final ScheduleConfig _scheduleConfig; private WorkflowConfig(JobDag jobDag, TargetState targetState, long expiry, ScheduleConfig scheduleConfig) { @@ -85,10 +79,6 @@ public class WorkflowConfig { private long _expiry = DEFAULT_EXPIRY; private ScheduleConfig _scheduleConfig; - public Builder() { - // Nothing to do - } - public WorkflowConfig build() { validate(); @@ -117,7 +107,6 @@ public class WorkflowConfig { public static Builder fromMap(Map<String, String> cfg) { Builder b = new Builder(); - if (cfg.containsKey(EXPIRY)) { b.setExpiry(Long.parseLong(cfg.get(EXPIRY))); } @@ -129,22 +118,9 @@ public class WorkflowConfig { } // Parse schedule-specific configs, if they exist - Date startTime = null; - if (cfg.containsKey(START_TIME)) { - try { - startTime = DEFAULT_DATE_FORMAT.parse(cfg.get(START_TIME)); - } catch (ParseException e) { - LOG.error("Unparseable date " + cfg.get(START_TIME), e); - } - } - if (cfg.containsKey(RECURRENCE_UNIT) && cfg.containsKey(RECURRENCE_INTERVAL)) { - /* - * b.setScheduleConfig(ScheduleConfig.recurringFromDate(startTime, - * TimeUnit.valueOf(cfg.get(RECURRENCE_UNIT)), - * Long.parseLong(cfg.get(RECURRENCE_INTERVAL)))); - */ - } else if (startTime != null) { - b.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(startTime)); + ScheduleConfig scheduleConfig = TaskUtil.parseScheduleFromConfigMap(cfg); + if (scheduleConfig != null) { + b.setScheduleConfig(scheduleConfig); } return b; } http://git-wip-us.apache.org/repos/asf/helix/blob/991d1a3e/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java index 4feda1b..6ad71a1 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowContext.java @@ -34,6 +34,7 @@ public class WorkflowContext extends HelixProperty { public static final String START_TIME = "START_TIME"; public static final String FINISH_TIME = "FINISH_TIME"; public static final String TASK_STATES = "TASK_STATES"; + public static final String LAST_SCHEDULED_WORKFLOW = "LAST_SCHEDULED_WORKFLOW"; public static final int UNFINISHED = -1; public WorkflowContext(ZNRecord record) { @@ -106,4 +107,12 @@ public class WorkflowContext extends HelixProperty { return Long.parseLong(tStr); } + + public void setLastScheduledSingleWorkflow(String wf) { + _record.setSimpleField(LAST_SCHEDULED_WORKFLOW, wf); + } + + public String getLastScheduledSingleWorkflow() { + return _record.getSimpleField(LAST_SCHEDULED_WORKFLOW); + } } http://git-wip-us.apache.org/repos/asf/helix/blob/991d1a3e/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java index 2ea23c7..a59e818 100644 --- a/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java +++ b/helix-core/src/main/java/org/apache/helix/task/beans/WorkflowBean.java @@ -21,12 +21,14 @@ package org.apache.helix.task.beans; import java.util.List; +import org.apache.helix.task.WorkflowConfig; + /** * Bean class used for parsing workflow definitions from YAML. */ public class WorkflowBean { public String name; - public String expiry; public List<JobBean> jobs; public ScheduleBean schedule; + public long expiry = WorkflowConfig.DEFAULT_EXPIRY; }
