Repository: falcon
Updated Branches:
  refs/heads/master 2804c5d1a -> 7f4ff1a37


FALCON-1512 Implement touch feature for native scheduler


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7f4ff1a3
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7f4ff1a3
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7f4ff1a3

Branch: refs/heads/master
Commit: 7f4ff1a37293305367b6ee3a33140eadfe1ac1b5
Parents: 2804c5d
Author: Pallavi Rao <[email protected]>
Authored: Tue Dec 15 14:49:35 2015 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Tue Dec 15 14:49:35 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../java/org/apache/falcon/util/DateUtil.java   | 21 ++++++++
 .../workflow/engine/OozieWorkflowEngine.java    | 20 ++-----
 .../falcon/execution/NotificationHandler.java   |  3 +-
 .../falcon/workflow/engine/DAGEngine.java       |  8 +++
 .../workflow/engine/FalconWorkflowEngine.java   | 48 ++++++++++++++++-
 .../falcon/workflow/engine/OozieDAGEngine.java  | 56 ++++++++++++++------
 .../apache/falcon/execution/MockDAGEngine.java  |  4 ++
 8 files changed, 129 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5753535..86a5c9a 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1512 Implement touch feature for native scheduler (Pallavi Rao)
+
     FALCON-1233 Support co-existence of Oozie scheduler (coord) and Falcon 
native scheduler (Pallavi Rao)
 
     FALCON-1596 Spring shell based CLI for Falcon

http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/common/src/main/java/org/apache/falcon/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java 
b/common/src/main/java/org/apache/falcon/util/DateUtil.java
index b70fa20..baf5b13 100644
--- a/common/src/main/java/org/apache/falcon/util/DateUtil.java
+++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java
@@ -78,4 +78,25 @@ public final class DateUtil {
             return null;
         }
     }
+
+    /**
+     * Returns the current time, with seconds and milliseconds reset to 0.
+     * @return
+     */
+    public static Date now() {
+        Calendar cal = Calendar.getInstance();
+        cal.set(Calendar.SECOND, 0);
+        cal.set(Calendar.MILLISECOND, 0);
+        return cal.getTime();
+    }
+
+    /**
+     * Adds the supplied number of seconds to the given date and returns the 
new Date.
+     * @param date
+     * @param seconds
+     * @return
+     */
+    public static Date offsetTime(Date date, int seconds) {
+        return new Date(1000L * seconds + date.getTime());
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 91230b2..b486357 100644
--- 
a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ 
b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -46,6 +46,7 @@ import org.apache.falcon.resource.InstancesSummaryResult;
 import org.apache.falcon.resource.InstancesSummaryResult.InstanceSummary;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.update.UpdateHelper;
+import org.apache.falcon.util.DateUtil;
 import org.apache.falcon.util.OozieUtils;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
@@ -1130,7 +1131,7 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
         if (bundle != MISSING && entityUpdated) {
             LOG.info("Updating entity through Workflow Engine {}", 
newEntity.toShortString());
             Date newEndTime = EntityUtil.getEndTime(newEntity, cluster);
-            if (newEndTime.before(now())) {
+            if (newEndTime.before(DateUtil.now())) {
                 throw new FalconException("Entity's end time " + 
SchemaHelper.formatDateUTC(newEndTime)
                     + " is before current time. Entity can't be updated. Use 
remove and add");
             }
@@ -1234,17 +1235,6 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
         return result.toString();
     }
 
-    private Date now() {
-        Calendar cal = Calendar.getInstance();
-        cal.set(Calendar.SECOND, 0);
-        cal.set(Calendar.MILLISECOND, 0);
-        return cal.getTime();
-    }
-
-    private Date offsetTime(Date date, int minute) {
-        return new Date(1000L * 60 * minute + date.getTime());
-    }
-
     @SuppressWarnings("MagicConstant")
     private Date getCoordLastActionTime(CoordinatorJob coord) {
         if (coord.getNextMaterializedTime() != null) {
@@ -1259,7 +1249,7 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
 
     private void updateCoords(String cluster, BundleJob bundle,
                               int concurrency, Date endTime, Entity entity) 
throws FalconException {
-        if (endTime.compareTo(now()) <= 0) {
+        if (endTime.compareTo(DateUtil.now()) <= 0) {
             throw new FalconException("End time " + 
SchemaHelper.formatDateUTC(endTime) + " can't be in the past");
         }
 
@@ -1296,7 +1286,7 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
                 LOG.info("Actions have materialized for this coord: {}, last 
action {}",
                         coord.getId(), 
SchemaHelper.formatDateUTC(lastActionTime));
                 if (!endTime.after(lastActionTime)) {
-                    Date pauseTime = offsetTime(endTime, -1);
+                    Date pauseTime = DateUtil.offsetTime(endTime, -1*60);
                     // set pause time which deletes future actions
                     LOG.info("Setting pause time on coord: {} to {}",
                             coord.getId(), 
SchemaHelper.formatDateUTC(pauseTime));
@@ -1352,7 +1342,7 @@ public class OozieWorkflowEngine extends 
AbstractWorkflowEngine {
     private Date getEffectiveTime(Cluster cluster, Entity newEntity) {
         //pick effective time as now() + 3 min to handle any time diff between 
falcon and oozie
         //oozie rejects changes with endtime < now
-        Date effectiveTime = offsetTime(now(), 3);
+        Date effectiveTime = DateUtil.offsetTime(DateUtil.now(), 3*60);
 
         //pick start time for new bundle which is after effectiveTime
         return EntityUtil.getNextStartTime(newEntity, cluster, effectiveTime);

http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java 
b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
index 2a2589e..2f68ddb 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/execution/NotificationHandler.java
@@ -29,7 +29,8 @@ public interface NotificationHandler {
      * When there are multiple notification handlers for the same event,
      * the priority determines which handler gets notified first.
      */
-    enum PRIORITY {HIGH(5), MEDIUM(3), LOW(0);
+    enum PRIORITY {HIGH(10), MEDIUM(5), LOW(0);
+
 
         private final int priority;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java 
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
index ebc05ec..e0d2a0e 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
@@ -112,4 +112,12 @@ public interface DAGEngine {
      * @throws DAGEngineException
      */
     Properties getConfiguration(String externalID) throws DAGEngineException;
+
+    /**
+     * Re-builds the workflow.
+     * @param entity
+     * @param skipDryRun
+     * @return
+     */
+    void touch(Entity entity, Boolean skipDryRun) throws DAGEngineException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
 
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index ac7cde8..2c45fbd 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -24,6 +24,7 @@ import org.apache.falcon.LifeCycle;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.execution.EntityExecutor;
@@ -37,12 +38,15 @@ import org.apache.falcon.state.EntityState;
 import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.util.DateUtil;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -340,7 +344,49 @@ public class FalconWorkflowEngine extends 
AbstractWorkflowEngine {
 
     @Override
     public String touch(Entity entity, String cluster, Boolean skipDryRun) 
throws FalconException {
-        throw new FalconException("Not yet Implemented");
+        EntityID id = new EntityID(entity);
+        // Ideally state store should have all entities, but, check anyway.
+        if (STATE_STORE.entityExists(id)) {
+            Date endTime = EntityUtil.getEndTime(entity, cluster);
+            if (endTime.before(DateUtil.now())) {
+                throw new FalconException("Entity's end time " + 
SchemaHelper.formatDateUTC(endTime)
+                        + " is before current time. Entity can't be touch-ed 
as it has completed.");
+            }
+            Collection<InstanceState> instances =
+                    STATE_STORE.getExecutionInstances(entity, cluster, 
InstanceState.getRunningStates());
+            // touch should happen irrespective of the state the entity is in.
+            DAGEngineFactory.getDAGEngine(cluster).touch(entity, (skipDryRun 
== null)? Boolean.FALSE : skipDryRun);
+            StringBuilder builder = new StringBuilder();
+            builder.append(entity.toShortString()).append("/Effective Time: ")
+                    .append(getEffectiveTime(entity, cluster, instances));
+            return builder.toString();
+        }
+        throw new FalconException("Could not find entity " + id + " in state 
store.");
+    }
+
+    // Effective time will be right after the last running instance.
+    private String getEffectiveTime(Entity entity, String cluster, 
Collection<InstanceState> instances)
+        throws FalconException {
+        if (instances == null || instances.isEmpty()) {
+            return SchemaHelper.formatDateUTC(DateUtil.now());
+        } else {
+            List<InstanceState> instanceList = new ArrayList(instances);
+            Collections.sort(instanceList, new Comparator<InstanceState>() {
+                @Override
+                public int compare(InstanceState x, InstanceState y) {
+                    return (x.getInstance().getInstanceSequence() < 
y.getInstance().getInstanceSequence()) ? -1
+                            : (x.getInstance().getInstanceSequence() == 
y.getInstance().getInstanceSequence() ? 0 : 1);
+                }
+            });
+            // Get the last element as the list is sorted in ascending order
+            Date lastRunningInstanceTime = 
instanceList.get(instanceList.size() - 1)
+                    .getInstance().getInstanceTime().toDate();
+            Cluster clusterEntity = 
ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
+            // Offset the time by a few seconds, else nextStartTime will be 
same as the reference time.
+            Date effectiveTime = EntityUtil
+                    .getNextStartTime(entity, clusterEntity, 
DateUtil.offsetTime(lastRunningInstanceTime, 10));
+            return SchemaHelper.formatDateUTC(effectiveTime);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java 
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
index 70c8353..a26eb77 100644
--- 
a/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
+++ 
b/scheduler/src/main/java/org/apache/falcon/workflow/engine/OozieDAGEngine.java
@@ -103,7 +103,7 @@ public class OozieDAGEngine implements DAGEngine {
             properties.setProperty(OozieClient.APP_PATH, buildPath.toString());
             return client.run(properties);
         } catch (OozieClientException e) {
-            LOG.error("Ozie client exception:", e);
+            LOG.error("Oozie client exception:", e);
             throw new DAGEngineException(e);
         } catch (FalconException e1) {
             LOG.error("Falcon Exception : ", e1);
@@ -125,9 +125,21 @@ public class OozieDAGEngine implements DAGEngine {
         }
     }
 
-    private void dryRunInternal(Properties props) throws OozieClientException {
-        LOG.info("Dry run with properties {}", props);
-        client.dryrun(props);
+    private void dryRunInternal(Properties properties, Path buildPath, Entity 
entity)
+        throws OozieClientException, DAGEngineException {
+        if (properties == null) {
+            LOG.info("Entity {} is not scheduled on cluster {}", 
entity.getName(), cluster);
+            throw new DAGEngineException("Properties for entity " + 
entity.getName() + " is empty");
+        }
+
+        switchUser();
+        LOG.debug("Logged in user is " + CurrentUser.getUser());
+        properties.setProperty(OozieClient.USER_NAME, CurrentUser.getUser());
+        properties.setProperty(OozieClient.APP_PATH, buildPath.toString());
+        properties.putAll(getDryRunProperties(entity));
+        //Do dryrun before run as run is asynchronous
+        LOG.info("Dry run with properties {}", properties);
+        client.dryrun(properties);
     }
 
     private void switchUser() {
@@ -230,18 +242,7 @@ public class OozieDAGEngine implements DAGEngine {
             prepareEntityBuildPath(entity);
             Path buildPath = EntityUtil.getNewStagingPath(cluster, entity);
             Properties properties = builder.build(cluster, buildPath);
-            if (properties == null) {
-                LOG.info("Entity {} is not scheduled on cluster {}", 
entity.getName(), cluster);
-                throw new DAGEngineException("Properties for entity " + 
entity.getName() + " is empty");
-            }
-
-            switchUser();
-            LOG.debug("Logged in user is " + CurrentUser.getUser());
-            properties.setProperty(OozieClient.USER_NAME, 
CurrentUser.getUser());
-            properties.setProperty(OozieClient.APP_PATH, buildPath.toString());
-            properties.putAll(getDryRunProperties(entity));
-            //Do submit before run as run is asynchronous
-            dryRunInternal(properties);
+            dryRunInternal(properties, buildPath, entity);
         } catch (OozieClientException e) {
             LOG.error("Oozie client exception:", e);
             throw new DAGEngineException(e);
@@ -360,6 +361,29 @@ public class OozieDAGEngine implements DAGEngine {
         return props;
     }
 
+    @Override
+    public void touch(Entity entity, Boolean skipDryRun) throws 
DAGEngineException {
+        // TODO : remove hardcoded Tag value when feed support is added.
+        try {
+            OozieOrchestrationWorkflowBuilder builder =
+                    OozieOrchestrationWorkflowBuilder.get(entity, cluster, 
Tag.DEFAULT);
+            if (!skipDryRun) {
+                Path buildPath = new Path("/tmp", "falcon" + entity.getName() 
+ System.currentTimeMillis());
+                Properties props = builder.build(cluster, buildPath);
+                dryRunInternal(props, buildPath, entity);
+            }
+            Path buildPath = EntityUtil.getNewStagingPath(cluster, entity);
+            // build it and forget it. The next run will always pick up from 
the latest staging path.
+            builder.build(cluster, buildPath);
+        } catch (FalconException fe) {
+            LOG.error("Falcon Exception : ", fe);
+            throw new DAGEngineException(fe);
+        } catch (OozieClientException e) {
+            LOG.error("Oozie client exception:", e);
+            throw new DAGEngineException(e);
+        }
+    }
+
     // Get status of a workflow (with retry) and ensure it is one of statuses 
requested.
     private void assertStatus(String jobID, Job.Status... statuses) throws 
DAGEngineException {
         String actualStatus = null;

http://git-wip-us.apache.org/repos/asf/falcon/blob/7f4ff1a3/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
----------------------------------------------------------------------
diff --git 
a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java 
b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
index 087114f..d274ad7 100644
--- a/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
+++ b/scheduler/src/test/java/org/apache/falcon/execution/MockDAGEngine.java
@@ -108,6 +108,10 @@ public class MockDAGEngine implements DAGEngine {
         return null;
     }
 
+    @Override
+    public void touch(Entity entity, Boolean skipDryRun) throws 
DAGEngineException {
+    }
+
     public void addFailInstance(ExecutionInstance failInstance) {
         this.failInstances.add(failInstance);
     }

Reply via email to