Repository: falcon Updated Branches: refs/heads/master 2804c5d1a -> 7f4ff1a37
FALCON-1512 Implement touch feature for native scheduler Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7f4ff1a3 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7f4ff1a3 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7f4ff1a3 Branch: refs/heads/master Commit: 7f4ff1a37293305367b6ee3a33140eadfe1ac1b5 Parents: 2804c5d Author: Pallavi Rao <[email protected]> Authored: Tue Dec 15 14:49:35 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Dec 15 14:49:35 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../java/org/apache/falcon/util/DateUtil.java | 21 ++++++++ .../workflow/engine/OozieWorkflowEngine.java | 20 ++----- .../falcon/execution/NotificationHandler.java | 3 +- .../falcon/workflow/engine/DAGEngine.java | 8 +++ .../workflow/engine/FalconWorkflowEngine.java | 48 ++++++++++++++++- .../falcon/workflow/engine/OozieDAGEngine.java | 56 ++++++++++++++------ .../apache/falcon/execution/MockDAGEngine.java | 4 ++ 8 files changed, 129 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5753535..86a5c9a 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1512 Implement touch feature for native scheduler (Pallavi Rao) + FALCON-1233 Support co-existence of Oozie scheduler (coord) and Falcon native scheduler (Pallavi Rao) FALCON-1596 Spring shell based CLI for Falcon http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/common/src/main/java/org/apache/falcon/util/DateUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java index b70fa20..baf5b13 100644 --- a/common/src/main/java/org/apache/falcon/util/DateUtil.java +++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java @@ -78,4 +78,25 @@ public final class DateUtil { return null; } } + + /** + * Returns the current time, with seconds and milliseconds reset to 0. + * @return + */ + public static Date now() { + Calendar cal = Calendar.getInstance(); + cal.set(Calendar.SECOND, 0); + cal.set(Calendar.MILLISECOND, 0); + return cal.getTime(); + } + + /** + * Adds the supplied number of seconds to the given date and returns the new Date. + * @param date + * @param seconds + * @return + */ + public static Date offsetTime(Date date, int seconds) { + return new Date(1000L * seconds + date.getTime()); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 91230b2..b486357 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -46,6 +46,7 @@ import org.apache.falcon.resource.InstancesSummaryResult; import org.apache.falcon.resource.InstancesSummaryResult.InstanceSummary; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.update.UpdateHelper; +import org.apache.falcon.util.DateUtil; import org.apache.falcon.util.OozieUtils; import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.util.StartupProperties; @@ -1130,7 +1131,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { if (bundle != MISSING && entityUpdated) { LOG.info("Updating entity through Workflow Engine {}", newEntity.toShortString()); Date newEndTime = EntityUtil.getEndTime(newEntity, cluster); - if (newEndTime.before(now())) { + if (newEndTime.before(DateUtil.now())) { throw new FalconException("Entity's end time " + SchemaHelper.formatDateUTC(newEndTime) + " is before current time. Entity can't be updated. Use remove and add"); } @@ -1234,17 +1235,6 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return result.toString(); } - private Date now() { - Calendar cal = Calendar.getInstance(); - cal.set(Calendar.SECOND, 0); - cal.set(Calendar.MILLISECOND, 0); - return cal.getTime(); - } - - private Date offsetTime(Date date, int minute) { - return new Date(1000L * 60 * minute + date.getTime()); - } - @SuppressWarnings("MagicConstant") private Date getCoordLastActionTime(CoordinatorJob coord) { if (coord.getNextMaterializedTime() != null) { @@ -1259,7 +1249,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { private void updateCoords(String cluster, BundleJob bundle, int concurrency, Date endTime, Entity entity) throws FalconException { - if (endTime.compareTo(now()) <= 0) { + if (endTime.compareTo(DateUtil.now()) <= 0) { throw new FalconException("End time " + SchemaHelper.formatDateUTC(endTime) + " can't be in the past"); } @@ -1296,7 +1286,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { LOG.info("Actions have materialized for this coord: {}, last action {}", coord.getId(), SchemaHelper.formatDateUTC(lastActionTime)); if (!endTime.after(lastActionTime)) { - Date pauseTime = offsetTime(endTime, -1); + Date pauseTime = DateUtil.offsetTime(endTime, -1*60); // set pause time which deletes future actions LOG.info("Setting pause time on coord: {} to {}", coord.getId(), SchemaHelper.formatDateUTC(pauseTime)); @@ -1352,7 +1342,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { private Date getEffectiveTime(Cluster cluster, Entity newEntity) { //pick effective time as now() + 3 min to handle any time diff between falcon and oozie //oozie rejects changes with endtime < now - Date effectiveTime = offsetTime(now(), 3); + Date effectiveTime = DateUtil.offsetTime(DateUtil.now(), 3*60); //pick start time for new bundle which is after effectiveTime return EntityUtil.getNextStartTime(newEntity, cluster, effectiveTime); http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java index 2a2589e..2f68ddb 100644 --- a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java +++ b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java @@ -29,7 +29,8 @@ public interface NotificationHandler { * When there are multiple notification handlers for the same event, * the priority determines which handler gets notified first. */ - enum PRIORITY {HIGH(5), MEDIUM(3), LOW(0); + enum PRIORITY {HIGH(10), MEDIUM(5), LOW(0); + private final int priority; http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java index ebc05ec..e0d2a0e 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java @@ -112,4 +112,12 @@ public interface DAGEngine { * @throws DAGEngineException */ Properties getConfiguration(String externalID) throws DAGEngineException; + + /** + * Re-builds the workflow. + * @param entity + * @param skipDryRun + * @return + */ + void touch(Entity entity, Boolean skipDryRun) throws DAGEngineException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java index ac7cde8..2c45fbd 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java @@ -24,6 +24,7 @@ import org.apache.falcon.LifeCycle; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.execution.EntityExecutor; @@ -37,12 +38,15 @@ import org.apache.falcon.state.EntityState; import org.apache.falcon.state.InstanceState; import org.apache.falcon.state.store.AbstractStateStore; import org.apache.falcon.state.store.StateStore; +import org.apache.falcon.util.DateUtil; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.List; import java.util.Map; @@ -340,7 +344,49 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine { @Override public String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException { - throw new FalconException("Not yet Implemented"); + EntityID id = new EntityID(entity); + // Ideally state store should have all entities, but, check anyway. + if (STATE_STORE.entityExists(id)) { + Date endTime = EntityUtil.getEndTime(entity, cluster); + if (endTime.before(DateUtil.now())) { + throw new FalconException("Entity's end time " + SchemaHelper.formatDateUTC(endTime) + + " is before current time. Entity can't be touch-ed as it has completed."); + } + Collection<InstanceState> instances = + STATE_STORE.getExecutionInstances(entity, cluster, InstanceState.getRunningStates()); + // touch should happen irrespective of the state the entity is in. + DAGEngineFactory.getDAGEngine(cluster).touch(entity, (skipDryRun == null)? Boolean.FALSE : skipDryRun); + StringBuilder builder = new StringBuilder(); + builder.append(entity.toShortString()).append("/Effective Time: ") + .append(getEffectiveTime(entity, cluster, instances)); + return builder.toString(); + } + throw new FalconException("Could not find entity " + id + " in state store."); + } + + // Effective time will be right after the last running instance. + private String getEffectiveTime(Entity entity, String cluster, Collection<InstanceState> instances) + throws FalconException { + if (instances == null || instances.isEmpty()) { + return SchemaHelper.formatDateUTC(DateUtil.now()); + } else { + List<InstanceState> instanceList = new ArrayList(instances); + Collections.sort(instanceList, new Comparator<InstanceState>() { + @Override + public int compare(InstanceState x, InstanceState y) { + return (x.getInstance().getInstanceSequence() < y.getInstance().getInstanceSequence()) ? -1 + : (x.getInstance().getInstanceSequence() == y.getInstance().getInstanceSequence() ? 0 : 1); + } + }); + // Get the last element as the list is sorted in ascending order + Date lastRunningInstanceTime = instanceList.get(instanceList.size() - 1) + .getInstance().getInstanceTime().toDate(); + Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster); + // Offset the time by a few seconds, else nextStartTime will be same as the reference time. + Date effectiveTime = EntityUtil + .getNextStartTime(entity, clusterEntity, DateUtil.offsetTime(lastRunningInstanceTime, 10)); + return SchemaHelper.formatDateUTC(effectiveTime); + } } @Override http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java index 70c8353..a26eb77 100644 --- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java +++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java @@ -103,7 +103,7 @@ public class OozieDAGEngine implements DAGEngine { properties.setProperty(OozieClient.APP_PATH, buildPath.toString()); return client.run(properties); } catch (OozieClientException e) { - LOG.error("Ozie client exception:", e); + LOG.error("Oozie client exception:", e); throw new DAGEngineException(e); } catch (FalconException e1) { LOG.error("Falcon Exception : ", e1); @@ -125,9 +125,21 @@ public class OozieDAGEngine implements DAGEngine { } } - private void dryRunInternal(Properties props) throws OozieClientException { - LOG.info("Dry run with properties {}", props); - client.dryrun(props); + private void dryRunInternal(Properties properties, Path buildPath, Entity entity) + throws OozieClientException, DAGEngineException { + if (properties == null) { + LOG.info("Entity {} is not scheduled on cluster {}", entity.getName(), cluster); + throw new DAGEngineException("Properties for entity " + entity.getName() + " is empty"); + } + + switchUser(); + LOG.debug("Logged in user is " + CurrentUser.getUser()); + properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser()); + properties.setProperty(OozieClient.APP_PATH, buildPath.toString()); + properties.putAll(getDryRunProperties(entity)); + //Do dryrun before run as run is asynchronous + LOG.info("Dry run with properties {}", properties); + client.dryrun(properties); } private void switchUser() { @@ -230,18 +242,7 @@ public class OozieDAGEngine implements DAGEngine { prepareEntityBuildPath(entity); Path buildPath = EntityUtil.getNewStagingPath(cluster, entity); Properties properties = builder.build(cluster, buildPath); - if (properties == null) { - LOG.info("Entity {} is not scheduled on cluster {}", entity.getName(), cluster); - throw new DAGEngineException("Properties for entity " + entity.getName() + " is empty"); - } - - switchUser(); - LOG.debug("Logged in user is " + CurrentUser.getUser()); - properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser()); - properties.setProperty(OozieClient.APP_PATH, buildPath.toString()); - properties.putAll(getDryRunProperties(entity)); - //Do submit before run as run is asynchronous - dryRunInternal(properties); + dryRunInternal(properties, buildPath, entity); } catch (OozieClientException e) { LOG.error("Oozie client exception:", e); throw new DAGEngineException(e); @@ -360,6 +361,29 @@ public class OozieDAGEngine implements DAGEngine { return props; } + @Override + public void touch(Entity entity, Boolean skipDryRun) throws DAGEngineException { + // TODO : remove hardcoded Tag value when feed support is added. + try { + OozieOrchestrationWorkflowBuilder builder = + OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT); + if (!skipDryRun) { + Path buildPath = new Path("/tmp", "falcon" + entity.getName() + System.currentTimeMillis()); + Properties props = builder.build(cluster, buildPath); + dryRunInternal(props, buildPath, entity); + } + Path buildPath = EntityUtil.getNewStagingPath(cluster, entity); + // build it and forget it. The next run will always pick up from the latest staging path. + builder.build(cluster, buildPath); + } catch (FalconException fe) { + LOG.error("Falcon Exception : ", fe); + throw new DAGEngineException(fe); + } catch (OozieClientException e) { + LOG.error("Oozie client exception:", e); + throw new DAGEngineException(e); + } + } + // Get status of a workflow (with retry) and ensure it is one of statuses requested. private void assertStatus(String jobID, Job.Status... statuses) throws DAGEngineException { String actualStatus = null; http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java ---------------------------------------------------------------------- diff --git a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java index 087114f..d274ad7 100644 --- a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java +++ b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java @@ -108,6 +108,10 @@ public class MockDAGEngine implements DAGEngine { return null; } + @Override + public void touch(Entity entity, Boolean skipDryRun) throws DAGEngineException { + } + public void addFailInstance(ExecutionInstance failInstance) { this.failInstances.add(failInstance); }
