abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1875

Change subject: WIP
......................................................................

WIP

Change-Id: Ifeac8c73e6bad39a13663b84a52121356e3c6b40
---
M asterixdb/asterix-active/pom.xml
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
D 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
R 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveNotificationHandler.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
R 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IAdapterFactory.java
R 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IDataSourceAdapter.java
A 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
A 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java
A 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
R 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
53 files changed, 775 insertions(+), 477 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/75/1875/1

diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 3dd24b6..fe5efbe 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -31,6 +31,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-om</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
index 1141912..434801a 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
@@ -30,7 +30,8 @@
         JOB_FINISHED,
         PARTITION_EVENT,
         EXTENSION_EVENT,
-        STATS_UPDATED
+        STATS_UPDATED,
+        FALURE
     }
 
     private final JobId jobId;
@@ -43,10 +44,6 @@
         this.entityId = entityId;
         this.eventKind = eventKind;
         this.eventObject = eventObject;
-    }
-
-    public ActiveEvent(JobId jobId, Kind eventKind, EntityId entityId) {
-        this(jobId, eventKind, entityId, null);
     }
 
     public JobId getJobId() {
@@ -79,8 +76,8 @@
             return true;
         }
         ActiveEvent other = (ActiveEvent) o;
-        return Objects.equals(entityId, other.entityId) && 
Objects.equals(eventKind, other.eventKind) && Objects
-                .equals(eventObject, other.eventObject);
+        return Objects.equals(entityId, other.entityId) && 
Objects.equals(eventKind, other.eventKind)
+                && Objects.equals(eventObject, other.eventObject);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
deleted file mode 100644
index 86c3e7d..0000000
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveEvent.Kind;
-import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.IJobLifecycleListener;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class ActiveLifecycleListener implements IJobLifecycleListener {
-
-    private static final Logger LOGGER = 
Logger.getLogger(ActiveLifecycleListener.class.getName());
-
-    private final ActiveJobNotificationHandler notificationHandler;
-    private final LinkedBlockingQueue<ActiveEvent> jobEventInbox;
-    private final ExecutorService executorService;
-
-    public ActiveLifecycleListener() {
-        notificationHandler = new ActiveJobNotificationHandler();
-        jobEventInbox = notificationHandler.getEventInbox();
-        executorService = Executors.newSingleThreadExecutor();
-        executorService.execute(notificationHandler);
-    }
-
-    @Override
-    public synchronized void notifyJobStart(JobId jobId) throws 
HyracksException {
-        EntityId entityId = notificationHandler.getEntity(jobId);
-        if (entityId != null) {
-            jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_STARTED, 
entityId));
-        }
-    }
-
-    @Override
-    public synchronized void notifyJobFinish(JobId jobId) throws 
HyracksException {
-        EntityId entityId = notificationHandler.getEntity(jobId);
-        if (entityId != null) {
-            jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_FINISHED, 
entityId));
-        } else {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
-            }
-        }
-    }
-
-    @Override
-    public void notifyJobCreation(JobId jobId, JobSpecification spec) throws 
HyracksException {
-        notificationHandler.notifyJobCreation(jobId, spec);
-    }
-
-    public void receive(ActivePartitionMessage message) {
-        jobEventInbox.add(new ActiveEvent(message.getJobId(), 
Kind.PARTITION_EVENT,
-                message.getActiveRuntimeId().getEntityId(), message));
-    }
-
-    public void stop() {
-        executorService.shutdown();
-    }
-
-    public ActiveJobNotificationHandler getNotificationHandler() {
-        return notificationHandler;
-    }
-}
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveNotificationHandler.java
similarity index 65%
rename from 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
rename to 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveNotificationHandler.java
index d2b8a89..0853144 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveNotificationHandler.java
@@ -19,50 +19,61 @@
 package org.apache.asterix.active;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.ActiveEvent.Kind;
+import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
 
-public class ActiveJobNotificationHandler implements Runnable {
-    public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
-    private static final Logger LOGGER = 
Logger.getLogger(ActiveJobNotificationHandler.class.getName());
+public class ActiveNotificationHandler implements IJobLifecycleListener, 
Runnable {
+
+    private static final Logger LOGGER = 
Logger.getLogger(ActiveNotificationHandler.class.getName());
     private static final boolean DEBUG = false;
+    public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
     private final LinkedBlockingQueue<ActiveEvent> eventInbox;
     private final Map<EntityId, IActiveEntityEventsListener> 
entityEventListeners;
-    private final Map<JobId, EntityId> jobId2ActiveJobInfos;
+    private final Map<JobId, EntityId> jobId2EntityId;
+    private final ExecutorService executorService;
 
-    public ActiveJobNotificationHandler() {
-        this.eventInbox = new LinkedBlockingQueue<>();
-        this.jobId2ActiveJobInfos = new HashMap<>();
-        this.entityEventListeners = new HashMap<>();
+    public ActiveNotificationHandler() {
+        eventInbox = new LinkedBlockingQueue<>();
+        jobId2EntityId = new HashMap<>();
+        entityEventListeners = new HashMap<>();
+        executorService = Executors.newSingleThreadExecutor();
+        executorService.execute(this);
     }
 
     @Override
     public void run() {
-        
Thread.currentThread().setName(ActiveJobNotificationHandler.class.getSimpleName());
-        LOGGER.log(Level.INFO, "Started " + 
ActiveJobNotificationHandler.class.getSimpleName());
+        
Thread.currentThread().setName(ActiveNotificationHandler.class.getSimpleName());
+        LOGGER.log(Level.INFO, "Started " + 
ActiveNotificationHandler.class.getSimpleName());
         while (!Thread.interrupted()) {
             try {
-                ActiveEvent event = getEventInbox().take();
-                EntityId entityId = jobId2ActiveJobInfos.get(event.getJobId());
+                ActiveEvent event = eventInbox.take();
+                EntityId entityId = jobId2EntityId.get(event.getJobId());
                 if (entityId != null) {
                     IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
                     LOGGER.log(Level.FINER, "Next event is of type " + 
event.getEventKind());
                     if (event.getEventKind() == Kind.JOB_FINISHED) {
                         LOGGER.log(Level.FINER, "Removing the job");
-                        jobId2ActiveJobInfos.remove(event.getJobId());
+                        jobId2EntityId.remove(event.getJobId());
                     }
                     if (listener != null) {
                         LOGGER.log(Level.FINER, "Notifying the listener");
                         listener.notify(event);
                     }
-
                 } else {
                     LOGGER.log(Level.SEVERE, "Entity not found for received 
message for job " + event.getJobId());
                 }
@@ -72,15 +83,40 @@
                 LOGGER.log(Level.SEVERE, "Error handling an active job event", 
e);
             }
         }
-        LOGGER.log(Level.INFO, "Stopped " + 
ActiveJobNotificationHandler.class.getSimpleName());
+        LOGGER.log(Level.INFO, "Stopped " + 
ActiveNotificationHandler.class.getSimpleName());
     }
 
-    public synchronized void removeListener(IActiveEntityEventsListener 
listener) throws HyracksDataException {
-        LOGGER.log(Level.FINER, "Removing the listener since it is not active 
anymore");
-        unregisterListener(listener);
+    @Override
+    public synchronized void notifyJobStart(JobId jobId) throws 
HyracksException {
+        EntityId entityId = getEntity(jobId);
+        if (entityId != null) {
+            eventInbox.add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, 
null));
+        }
     }
 
-    public IActiveEntityEventsListener getActiveEntityListener(EntityId 
entityId) {
+    @Override
+    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions)
+            throws HyracksException {
+        EntityId entityId = getEntity(jobId);
+        if (entityId != null) {
+            eventInbox.add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, 
Pair.of(jobStatus, exceptions)));
+        } else {
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
+            }
+        }
+    }
+
+    public void receive(ActivePartitionMessage message) {
+        eventInbox.add(new ActiveEvent(message.getJobId(), 
Kind.PARTITION_EVENT,
+                message.getActiveRuntimeId().getEntityId(), message));
+    }
+
+    public void stop() {
+        executorService.shutdown();
+    }
+
+    public IActiveEntityEventsListener getListener(EntityId entityId) {
         if (DEBUG) {
             LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId 
entityId) was called with entity " + entityId);
             IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
@@ -90,10 +126,11 @@
     }
 
     public EntityId getEntity(JobId jobId) {
-        return jobId2ActiveJobInfos.get(jobId);
+        return jobId2EntityId.get(jobId);
     }
 
-    public void notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) {
+    @Override
+    public void notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) throws HyracksDataException {
         LOGGER.log(Level.FINER,
                 "notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) was called with jobId = " + jobId);
         Object property = 
jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
@@ -103,18 +140,11 @@
         }
         EntityId entityId = (EntityId) property;
         monitorJob(jobId, entityId);
-        boolean found = jobId2ActiveJobInfos.get(jobId) != null;
+        boolean found = jobId2EntityId.get(jobId) != null;
         LOGGER.log(Level.FINER, "Job was found to be: " + (found ? "Active" : 
"Inactive"));
-        IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
-        if (listener != null) {
-            // It is okay to bypass the event inbox in this case because we 
know this is the first event for this entity
-            listener.notify(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, 
jobSpecification));
+        if (!eventInbox.offer(new ActiveEvent(jobId, Kind.JOB_CREATED, 
entityId, jobSpecification))) {
+            throw new HyracksDataException("Full active event inbox");
         }
-        LOGGER.log(Level.FINER, "Listener was notified" + jobId);
-    }
-
-    public LinkedBlockingQueue<ActiveEvent> getEventInbox() {
-        return eventInbox;
     }
 
     public synchronized IActiveEntityEventsListener[] getEventListeners() {
@@ -147,14 +177,14 @@
         }
     }
 
-    public synchronized void monitorJob(JobId jobId, EntityId activeJob) {
+    public synchronized void monitorJob(JobId jobId, EntityId entityId) {
         if (DEBUG) {
             LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob 
activeJob) called with job id: " + jobId);
-            boolean found = jobId2ActiveJobInfos.get(jobId) != null;
+            boolean found = jobId2EntityId.get(jobId) != null;
             LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? 
"Active" : "Inactive"));
         }
-        if (entityEventListeners.containsKey(activeJob)) {
-            if (jobId2ActiveJobInfos.containsKey(jobId)) {
+        if (entityEventListeners.containsKey(entityId)) {
+            if (jobId2EntityId.containsKey(jobId)) {
                 LOGGER.severe("Job is already being monitored for job: " + 
jobId);
                 return;
             }
@@ -162,8 +192,8 @@
                 LOGGER.log(Level.WARNING, "monitoring started for job id: " + 
jobId);
             }
         } else {
-            LOGGER.severe("No listener was found for the entity: " + 
activeJob);
+            LOGGER.severe("No listener was found for the entity: " + entityId);
         }
-        jobId2ActiveJobInfos.put(jobId, activeJob);
+        jobId2EntityId.put(jobId, entityId);
     }
 }
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
index af8f5ca..1430eb8 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
@@ -20,27 +20,36 @@
 
 public enum ActivityState {
     /**
-     * The initial state of an activity.
-     */
-    CREATED,
-    /**
-     * The starting state and a possible terminal state. Next state can only 
be {@code ActivityState.STARTING}
+     * The starting state and a possible terminal state. Next state can only 
be {@code ActivityState.CREATED}
      */
     STOPPED,
     /**
-     * A terminal state
+     * An unexpected failure caused the activity to stop
      */
     FAILED,
     /**
-     * An intermediate state. Next state can only be {@code 
ActivityState.STARTED} or {@code ActivityState.FAILED}
+     * An intermediate state. Next state can only be {@code 
ActivityState.RUNNING} or {@code ActivityState.FAILED}
      */
     STARTING,
     /**
-     * An intermediate state. Next state can only be {@code 
ActivityState.STOPPING} or {@code ActivityState.FAILED}
+     * An intermediate state. Next state can only be
+     * {@code ActivityState.STOPPING}, {@code ActivityState.SUSPENDING}, or 
{@code ActivityState.FAILED}
      */
-    STARTED,
+    RUNNING,
     /**
      * An intermediate state. Next state can only be {@code 
ActivityState.STOPPED} or {@code ActivityState.FAILED}
      */
-    STOPPING
+    STOPPING,
+    /**
+     * An intermediate state. Next state can only be {@code 
ActivityState.SUSPENDED} or {@code ActivityState.FAILED}
+     */
+    SUSPENDING,
+    /**
+     * An intermediate state. Next state can only be {@code 
ActivityState.STOPPED} or {@code ActivityState.RESUMING}
+     */
+    SUSPENDED,
+    /**
+     * An intermediate state. Next state can only be {@code 
ActivityState.RUNNING} or {@code ActivityState.FAILED}
+     */
+    RESUMING
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 4bc02f3..9cbb2c1 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -79,4 +79,23 @@
      * @throws HyracksDataException
      */
     void refreshStats(long timeout) throws HyracksDataException;
+
+    /**
+     * resume the activity
+     *
+     * @throws HyracksDataException
+     */
+    void resume() throws HyracksDataException;
+
+    /**
+     * indicates that a dataset is no longer being used by this active entity
+     *
+     * @param dataset
+     */
+    void remove(IDataset dataset);
+
+    /**
+     * @return true, if entity is active, false otherwise
+     */
+    boolean isActive();
 }
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
index 69f7f1c..da7be77 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
@@ -43,14 +43,9 @@
     /**
      * Wait until the terminal event has been received
      *
-     * @throws InterruptedException
+     * @throws Exception
      */
-    void sync() throws InterruptedException;
-
-    /**
-     * Stop watching events
-     */
-    void unsubscribe();
+    void sync() throws Exception;
 
     /**
      * callback upon successful subscription
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IAdapterFactory.java
similarity index 98%
rename from 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
rename to 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IAdapterFactory.java
index 40bc7d8..1344f5f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IAdapterFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+package org.apache.asterix.active;
 
 import java.io.Serializable;
 import java.util.Map;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IDataSourceAdapter.java
similarity index 96%
rename from 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
rename to 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IDataSourceAdapter.java
index 48df79b..349c7d7 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IDataSourceAdapter.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.api;
+package org.apache.asterix.active;
 
 import java.io.Serializable;
 
@@ -29,6 +29,7 @@
  * to be implemented by each adapter irrespective of the the kind of
  * adapter(pull or push).
  */
+@FunctionalInterface
 public interface IDataSourceAdapter extends Serializable {
 
     public enum AdapterType {
@@ -38,6 +39,7 @@
 
     /**
      * Triggers the adapter to begin ingesting data from the external source.
+     * 
      * @param partition
      *            The adapter could be running with a degree of parallelism.
      *            partition corresponds to the i'th parallel instance.
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
new file mode 100644
index 0000000..a010984
--- /dev/null
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+@FunctionalInterface
+public interface IRetryPolicy {
+    /**
+     * @return true if one more attempt should be done
+     */
+    boolean retry();
+}
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java
new file mode 100644
index 0000000..eae568c
--- /dev/null
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+@FunctionalInterface
+public interface IRetryPolicyFactory {
+    /**
+     * @return an instance of retry policy
+     */
+    IRetryPolicy create();
+}
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
new file mode 100644
index 0000000..af6be01
--- /dev/null
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+public class NoRetryPolicyFactory implements IRetryPolicyFactory {
+    public static final NoRetryPolicyFactory INSTANCE = new 
NoRetryPolicyFactory();
+    private static final IRetryPolicy policy = () -> false;
+
+    private NoRetryPolicyFactory() {
+    }
+
+    @Override
+    public IRetryPolicy create() {
+        return policy;
+    }
+}
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index 9391044..e0388c2 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -21,7 +21,7 @@
 import java.io.Serializable;
 import java.util.Objects;
 
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
@@ -64,7 +64,7 @@
 
     @Override
     public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
+        ActiveNotificationHandler activeListener = (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
         activeListener.receive(this);
     }
 
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 19f0dcc..02ea321 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -23,6 +23,8 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.lang.common.statement.Query;
@@ -164,4 +166,14 @@
      */
     String getActiveDataverseName(String dataverse);
 
+    /**
+     * @return the storage component provider
+     */
+    IStorageComponentProvider getComponentProvider();
+
+    /**
+     * @return the application context
+     */
+    ICcApplicationContext getAppCtx();
+
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
index 593faa6..ed8d430 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
@@ -18,14 +18,12 @@
  */
 package org.apache.asterix.api.http.server;
 
-import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.external.feed.watch.StatsSubscriber;
@@ -38,19 +36,21 @@
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 
+import io.netty.handler.codec.http.HttpResponseStatus;
+
 public class ActiveStatsApiServlet extends AbstractServlet {
 
     private static final Logger LOGGER = 
Logger.getLogger(ActiveStatsApiServlet.class.getName());
     private static final int DEFAULT_EXPIRE_TIME = 2000;
-    private final ActiveLifecycleListener activeLifecycleListener;
+    private final ActiveNotificationHandler activeLifecycleListener;
 
     public ActiveStatsApiServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, ICcApplicationContext appCtx) {
         super(ctx, paths);
-        this.activeLifecycleListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
+        this.activeLifecycleListener = (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
     }
 
     private JsonNode constructNode(ObjectMapper om, 
IActiveEntityEventsListener eventListener, long currentTime,
-            long ttl) throws InterruptedException, IOException {
+            long ttl) throws Exception {
         long statsTimeStamp = eventListener.getStatsTimeStamp();
         if (currentTime - statsTimeStamp > ttl) {
             StatsSubscriber subscriber = new StatsSubscriber(eventListener);
@@ -66,7 +66,7 @@
         // Obtain all feed status
         String localPath = localPath(request);
         int expireTime;
-        IActiveEntityEventsListener[] listeners = 
activeLifecycleListener.getNotificationHandler().getEventListeners();
+        IActiveEntityEventsListener[] listeners = 
activeLifecycleListener.getEventListeners();
         ObjectMapper om = new ObjectMapper();
         om.enable(SerializationFeature.INDENT_OUTPUT);
         ObjectNode resNode = om.createObjectNode();
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
similarity index 65%
rename from 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
rename to 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 3216bfe..63ac5ab 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.feed.management;
+package org.apache.asterix.app.active;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -27,21 +27,24 @@
 
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveEvent.Kind;
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.active.IRetryPolicyFactory;
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.active.message.StatsRequestMessage;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.metadata.IDataset;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -51,41 +54,53 @@
 public class ActiveEntityEventsListener implements IActiveEntityEventsListener 
{
 
     private static final Logger LOGGER = 
Logger.getLogger(ActiveEntityEventsListener.class.getName());
-
-    enum RequestState {
-        INIT,
-        STARTED,
-        FINISHED
-    }
-
-    // members
-    protected volatile ActivityState state;
-    protected JobId jobId;
+    // finals
+    protected final ActiveNotificationHandler handler;
     protected final List<IActiveEventSubscriber> subscribers = new 
ArrayList<>();
-    protected final ICcApplicationContext appCtx;
+    protected final IStatementExecutor statementExecutor;
+    protected final MetadataProvider metadataProvider;
+    protected final IHyracksClientConnection hcc;
     protected final EntityId entityId;
-    protected final List<IDataset> datasets;
+    protected final List<? extends IDataset> datasets;
     protected final ActiveEvent statsUpdatedEvent;
+    protected final String runtimeName;
+    protected final IRetryPolicyFactory retryPolicyFactory;
+    // mutables
+    protected volatile ActivityState state;
+    protected AlgebricksAbsolutePartitionConstraint locations;
+    protected ActivityState prevState;
+    protected JobId jobId;
     protected long statsTimestamp;
     protected String stats;
-    protected RequestState statsRequestState;
-    protected final String runtimeName;
-    protected final AlgebricksAbsolutePartitionConstraint locations;
+    protected boolean isFetchingStats;
     protected int numRegistered;
 
-    public ActiveEntityEventsListener(ICcApplicationContext appCtx, EntityId 
entityId, List<IDataset> datasets,
-            AlgebricksAbsolutePartitionConstraint locations, String 
runtimeName) {
-        this.appCtx = appCtx;
+    public ActiveEntityEventsListener(IStatementExecutor statementExecutor, 
IHyracksClientConnection hcc,
+            EntityId entityId, List<? extends IDataset> datasets, 
AlgebricksAbsolutePartitionConstraint locations,
+            String runtimeName, IRetryPolicyFactory retryPolicyFactory) throws 
HyracksDataException {
+        this.statementExecutor = statementExecutor;
+        this.metadataProvider =
+                new MetadataProvider(statementExecutor.getAppCtx(), null, 
statementExecutor.getComponentProvider());
+        this.hcc = hcc;
         this.entityId = entityId;
         this.datasets = datasets;
+        this.retryPolicyFactory = retryPolicyFactory;
         this.state = ActivityState.STOPPED;
         this.statsTimestamp = -1;
-        this.statsRequestState = RequestState.INIT;
-        this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, 
entityId);
+        this.isFetchingStats = false;
+        this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, 
entityId, null);
         this.stats = "{\"Stats\":\"N/A\"}";
         this.runtimeName = runtimeName;
         this.locations = locations;
         this.numRegistered = 0;
+        this.handler =
+                ((ActiveNotificationHandler) 
metadataProvider.getApplicationContext().getActiveNotificationHandler());
+        handler.registerListener(this);
+    }
+
+    protected void setState(ActivityState newState) {
+        this.prevState = state;
+        this.state = newState;
     }
 
     @Override
@@ -95,13 +110,13 @@
             ActiveEvent.Kind eventKind = event.getEventKind();
             switch (eventKind) {
                 case JOB_CREATED:
-                    state = ActivityState.CREATED;
+                    jobCreated(event);
                     break;
                 case JOB_STARTED:
                     start(event);
                     break;
                 case JOB_FINISHED:
-                    finish();
+                    finish(event);
                     break;
                 case PARTITION_EVENT:
                     handle((ActivePartitionMessage) event.getEventObject());
@@ -116,26 +131,29 @@
         }
     }
 
+    protected synchronized void jobCreated(ActiveEvent event) {
+        setState(ActivityState.STARTING);
+    }
+
     protected synchronized void handle(ActivePartitionMessage message) {
         if (message.getEvent() == 
ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
             numRegistered++;
             if (numRegistered == locations.getLocations().length) {
-                state = ActivityState.STARTED;
+                setState(ActivityState.RUNNING);
             }
         }
     }
 
-    private void finish() throws Exception {
-        IHyracksClientConnection hcc = appCtx.getHcc();
-        JobStatus status = hcc.getJobStatus(jobId);
-        state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED : 
ActivityState.STOPPED;
-        ActiveLifecycleListener activeLcListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-        activeLcListener.getNotificationHandler().removeListener(this);
+    @SuppressWarnings("unchecked")
+    protected void finish(ActiveEvent event) throws HyracksDataException {
+        jobId = null;
+        Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, 
List<Exception>>) event.getEventObject();
+        setState(status.getLeft().equals(JobStatus.FAILURE) ? 
ActivityState.FAILED : ActivityState.STOPPED);
     }
 
-    private void start(ActiveEvent event) {
+    protected void start(ActiveEvent event) {
         this.jobId = event.getJobId();
-        state = ActivityState.STARTING;
+        numRegistered = 0;
     }
 
     @Override
@@ -161,7 +179,12 @@
 
     @Override
     public boolean isEntityUsingDataset(IDataset dataset) {
-        return datasets.contains(dataset);
+        return isActive() && datasets.contains(dataset);
+    }
+
+    @Override
+    public void remove(IDataset dataset) {
+        datasets.remove(dataset);
     }
 
     public JobId getJobId() {
@@ -193,15 +216,16 @@
     public void refreshStats(long timeout) throws HyracksDataException {
         LOGGER.log(Level.INFO, "refreshStats called");
         synchronized (this) {
-            if (state != ActivityState.STARTED || statsRequestState == 
RequestState.STARTED) {
-                LOGGER.log(Level.INFO, "returning immediately since state = " 
+ state + " and statsRequestState = "
-                        + statsRequestState);
+            if (state != ActivityState.RUNNING || isFetchingStats) {
+                LOGGER.log(Level.INFO,
+                        "returning immediately since state = " + state + " and 
fetchingStats = " + isFetchingStats);
                 return;
             } else {
-                statsRequestState = RequestState.STARTED;
+                isFetchingStats = true;
             }
         }
-        ICCMessageBroker messageBroker = (ICCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        ICCMessageBroker messageBroker =
+                (ICCMessageBroker) 
metadataProvider.getApplicationContext().getServiceContext().getMessageBroker();
         long reqId = messageBroker.newRequestId();
         List<INcAddressedMessage> requests = new ArrayList<>();
         List<String> ncs = Arrays.asList(locations.getLocations());
@@ -217,8 +241,7 @@
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
-        // Same as above
-        statsRequestState = RequestState.FINISHED;
+        isFetchingStats = false;
     }
 
     protected synchronized void notifySubscribers(ActiveEvent event) {
@@ -245,4 +268,29 @@
         return locations;
     }
 
+    public void suspend() throws HyracksDataException {
+        throw new HyracksDataException("Unsupported");
+    }
+
+    @Override
+    public void resume() throws HyracksDataException {
+        throw new HyracksDataException("Unsupported");
+    }
+
+    @Override
+    public boolean isActive() {
+        return state != ActivityState.STOPPED && state != ActivityState.FAILED;
+    }
+
+    public void unregister() throws HyracksDataException {
+        handler.unregisterListener(this);
+    }
+
+    public void start(MetadataProvider metadataProvider) throws 
HyracksDataException {
+        throw new HyracksDataException("Unsupported");
+    }
+
+    public void stop(MetadataProvider metadataProvider) throws 
HyracksDataException {
+        throw new HyracksDataException("Unsupported");
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
new file mode 100644
index 0000000..9dd1f54
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.active;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveNotificationHandler;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.asterix.common.utils.JobUtils;
+import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
+import org.apache.asterix.lang.common.statement.StartFeedStatement;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.utils.FeedOperations;
+import org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedEventsListener extends ActiveEntityEventsListener {
+
+    private static final Logger LOGGER = 
Logger.getLogger(FeedEventsListener.class.getName());
+    private final Feed feed;
+    private final List<FeedConnection> feedConnections;
+
+    public FeedEventsListener(IStatementExecutor statementExecutor, 
IHyracksClientConnection hcc, EntityId entityId,
+            List<? extends IDataset> datasets, 
AlgebricksAbsolutePartitionConstraint locations, String runtimeName,
+            IRetryPolicyFactory retryPolicyFactory, Feed feed, final 
List<FeedConnection> feedConnections)
+            throws HyracksDataException {
+        super(statementExecutor, hcc, entityId, datasets, locations, 
runtimeName, retryPolicyFactory);
+        this.feed = feed;
+        this.feedConnections = feedConnections;
+    }
+
+    @Override
+    public synchronized void remove(IDataset dataset) {
+        super.remove(dataset);
+        feedConnections.removeIf(o -> 
o.getDataverseName().equals(dataset.getDataverseName())
+                && o.getDatasetName().equals(dataset.getDatasetName()));
+    }
+
+    @Override
+    public synchronized void start(MetadataProvider metadataProvider) throws 
HyracksDataException {
+        if (state != ActivityState.FAILED && state != ActivityState.STOPPED) {
+            throw new HyracksDataException("Feed " + entityId.getEntityName() 
+ " is started already.");
+        }
+        try {
+            setState(ActivityState.STARTING);
+            Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> 
jobInfo =
+                    FeedOperations.buildStartFeedJob(statementExecutor, 
metadataProvider, feed, feedConnections, hcc);
+            JobSpecification feedJob = jobInfo.getLeft();
+            IActiveEventSubscriber eventSubscriber = new 
WaitForStateSubscriber(this, ActivityState.RUNNING);
+            
feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, 
entityId);
+            // TODO(Yingyi): currently we do not check IFrameWriter protocol 
violations for Feed jobs.
+            // We will need to design general exception handling mechanism for 
feeds.
+            locations = jobInfo.getRight();
+            JobUtils.runJob(hcc, feedJob,
+                    
Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)));
+            eventSubscriber.sync();
+        } catch (Exception e) {
+            setState(ActivityState.FAILED);
+            throw HyracksDataException.create(e);
+        }
+        LOGGER.log(Level.INFO, "Submitted");
+    }
+
+    @Override
+    public synchronized void stop(MetadataProvider metadataProvider) throws 
HyracksDataException {
+        try {
+            if (state == ActivityState.STOPPED) {
+                throw new HyracksDataException("Feed " + 
entityId.getEntityName() + " is not started.");
+            }
+            if (jobId == null) {
+                setState(ActivityState.STOPPED);
+                return;
+            }
+            IActiveEventSubscriber eventSubscriber = new 
WaitForStateSubscriber(this, ActivityState.STOPPED);
+            // Construct ActiveMessage
+            for (int i = 0; i < getLocations().getLocations().length; i++) {
+                String intakeLocation = getLocations().getLocations()[i];
+                
FeedOperations.SendStopMessageToNode(metadataProvider.getApplicationContext(), 
entityId, intakeLocation,
+                        i);
+            }
+            eventSubscriber.sync();
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public Feed getFeed() {
+        return feed;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
index 0e72976..bbfe680 100755
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
@@ -33,11 +33,11 @@
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Unmarshaller;
 
+import org.apache.asterix.active.IDataSourceAdapter;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
 import org.apache.asterix.external.library.ExternalLibrary;
 import org.apache.asterix.external.library.LibraryAdapter;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2967a38..0f3b005 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -41,17 +41,18 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.active.NoRetryPolicyFactory;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.api.common.APIFramework;
 import org.apache.asterix.api.http.server.AbstractQueryApiServlet;
 import org.apache.asterix.api.http.server.ApiServlet;
 import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.active.ActiveEntityEventsListener;
+import org.apache.asterix.app.active.FeedEventsListener;
 import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.config.ClusterProperties;
@@ -71,16 +72,12 @@
 import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
 import org.apache.asterix.lang.common.base.IReturningStatement;
 import org.apache.asterix.lang.common.base.IRewriterFactory;
@@ -178,7 +175,6 @@
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.commons.lang3.tuple.Triple;
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
@@ -210,7 +206,7 @@
 
     public static final boolean IS_DEBUG_MODE = false;// true
     protected final List<Statement> statements;
-    protected final ICcApplicationContext appCtx;
+    private final ICcApplicationContext appCtx;
     protected final SessionOutput sessionOutput;
     protected final SessionConfig sessionConfig;
     protected Dataverse activeDataverse;
@@ -693,8 +689,8 @@
     protected static void 
validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset)
             throws CompilationException {
         StringBuilder builder = null;
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
         IActiveEntityEventsListener[] listeners = 
activeEventHandler.getEventListeners();
         for (IActiveEntityEventsListener listener : listeners) {
             if (listener.isEntityUsingDataset(dataset)) {
@@ -1169,26 +1165,31 @@
                     throw new AlgebricksException("There is no dataverse with 
this name " + dataverseName + ".");
                 }
             }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
             // # disconnect all feeds from any datasets in the dataverse.
-            ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-            ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
+            ActiveNotificationHandler activeEventHandler =
+                    (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
             IActiveEntityEventsListener[] activeListeners = 
activeEventHandler.getEventListeners();
-            Identifier dvId = new Identifier(dataverseName);
-            MetadataProvider tempMdProvider = new MetadataProvider(appCtx, 
metadataProvider.getDefaultDataverse(),
-                    metadataProvider.getStorageComponentProvider());
-            tempMdProvider.setConfig(metadataProvider.getConfig());
             for (IActiveEntityEventsListener listener : activeListeners) {
                 EntityId activeEntityId = listener.getEntityId();
                 if 
(activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME)
                         && 
activeEntityId.getDataverse().equals(dataverseName)) {
-                    tempMdProvider.getLocks().reset();
-                    stopFeedBeforeDelete(new Pair<>(dvId, new 
Identifier(activeEntityId.getEntityName())),
-                            tempMdProvider);
-                    // prepare job to remove feed log storage
-                    
jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
-                            MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
dataverseName, activeEntityId.getEntityName())));
+                    if (listener.getState() != ActivityState.STOPPED) {
+                        ((ActiveEntityEventsListener) 
listener).stop(metadataProvider);
+                    }
+                    FeedEventsListener feedListener = (FeedEventsListener) 
listener;
+                    mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                    bActiveTxn = true;
+                    metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                    doDropFeed(hcc, metadataProvider, feedListener.getFeed());
+                    
MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
+                    bActiveTxn = false;
                 }
             }
+            mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+            bActiveTxn = true;
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
             // #. prepare jobs which will drop corresponding datasets with 
indexes.
             List<Dataset> datasets = 
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
@@ -1295,20 +1296,6 @@
         }
     }
 
-    protected void stopFeedBeforeDelete(Pair<Identifier, Identifier> 
feedNameComp, MetadataProvider metadataProvider) {
-        StopFeedStatement disStmt = new StopFeedStatement(feedNameComp);
-        try {
-            handleStopFeedStatement(metadataProvider, disStmt);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Stopped feed " + feedNameComp.second.getValue());
-            }
-        } catch (Exception exception) {
-            if (LOGGER.isLoggable(Level.WARNING)) {
-                LOGGER.warning("Unable to stop feed " + 
feedNameComp.second.getValue() + exception);
-            }
-        }
-    }
-
     public void handleDatasetDropStatement(MetadataProvider metadataProvider, 
Statement stmt,
             IHyracksClientConnection hcc) throws Exception {
         DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
@@ -1403,8 +1390,8 @@
                 throw new AlgebricksException(
                         "There is no dataset with this name " + datasetName + 
" in dataverse " + dataverseName);
             }
-            ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-            ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
+            ActiveNotificationHandler activeEventHandler =
+                    (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
             IActiveEntityEventsListener[] listeners = 
activeEventHandler.getEventListeners();
             StringBuilder builder = null;
             for (IActiveEntityEventsListener listener : listeners) {
@@ -1968,32 +1955,35 @@
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 return;
             }
-
-            EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, 
feedName);
-            ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-            ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
-            ActiveEntityEventsListener listener =
-                    (ActiveEntityEventsListener) 
activeEventHandler.getActiveEntityListener(feedId);
-            if (listener != null) {
-                throw new AlgebricksException("Feed " + feedId
-                        + " is currently active and connected to the following 
dataset(s) \n" + listener.toString());
-            } else {
-                JobSpecification spec = 
FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
-                        MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
feedId.getDataverse(), feedId.getEntityName()));
-                runJob(hcc, spec);
-                MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName, 
feedName);
-            }
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Removed feed " + feedId);
-            }
+            doDropFeed(hcc, metadataProvider, feed);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
         } finally {
             metadataProvider.getLocks().unlock();
+        }
+    }
+
+    protected void doDropFeed(IHyracksClientConnection hcc, MetadataProvider 
metadataProvider, Feed feed)
+            throws Exception {
+        MetadataTransactionContext mdTxnCtx = 
metadataProvider.getMetadataTxnContext();
+        EntityId feedId = feed.getFeedId();
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+        ActiveEntityEventsListener listener = (ActiveEntityEventsListener) 
activeEventHandler.getListener(feedId);
+        if (listener != null && listener.getState() != ActivityState.STOPPED) {
+            throw new AlgebricksException("Feed " + feedId
+                    + " is currently active and connected to the following 
dataset(s) \n" + listener.toString());
+        } else if (listener != null) {
+            listener.unregister();
+        }
+        JobSpecification spec = 
FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
+                MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
feedId.getDataverse(), feedId.getEntityName()));
+        runJob(hcc, spec);
+        MetadataManager.INSTANCE.dropFeed(mdTxnCtx, feed.getDataverseName(), 
feed.getFeedName());
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Removed feed " + feedId);
         }
     }
 
@@ -2029,56 +2019,41 @@
         StartFeedStatement sfs = (StartFeedStatement) stmt;
         String dataverseName = getActiveDataverse(sfs.getDataverseName());
         String feedName = sfs.getFeedName().getValue();
-        // Transcation handler
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        // Runtime handler
-        EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, 
feedName);
-        // Feed & Feed Connections
-        Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, 
feedName,
-                metadataProvider.getMetadataTxnContext());
-        List<FeedConnection> feedConnections = MetadataManager.INSTANCE
-                .getFeedConections(metadataProvider.getMetadataTxnContext(), 
dataverseName, feedName);
-        ILangCompilationProvider compilationProvider = new 
AqlCompilationProvider();
-        IStorageComponentProvider storageComponentProvider = new 
StorageComponentProvider();
-        DefaultStatementExecutorFactory qtFactory = new 
DefaultStatementExecutorFactory();
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
-        ActiveEntityEventsListener listener = (ActiveEntityEventsListener) 
activeEventHandler
-                .getActiveEntityListener(entityId);
-        if (listener != null) {
-            throw new AlgebricksException("Feed " + feedName + " is started 
already.");
-        }
-        // Start
+        boolean committed = false;
         
MetadataLockManager.INSTANCE.startFeedBegin(metadataProvider.getLocks(), 
dataverseName,
-                dataverseName + "." + feedName, feedConnections);
+                dataverseName + "." + feedName);
         try {
-            // Prepare policy
-            List<IDataset> datasets = new ArrayList<>();
-            for (FeedConnection connection : feedConnections) {
-                Dataset ds = 
metadataProvider.findDataset(connection.getDataverseName(), 
connection.getDatasetName());
-                datasets.add(ds);
+            metadataProvider.setMetadataTxnContext(mdTxnCtx);
+            // Runtime handler
+            EntityId entityId = new EntityId(Feed.EXTENSION_NAME, 
dataverseName, feedName);
+            // Feed & Feed Connections
+            Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, 
feedName,
+                    metadataProvider.getMetadataTxnContext());
+            List<FeedConnection> feedConnections = MetadataManager.INSTANCE
+                    
.getFeedConections(metadataProvider.getMetadataTxnContext(), dataverseName, 
feedName);
+            
MetadataLockManager.INSTANCE.lockFeedConnections(metadataProvider.getLocks(), 
feedConnections);
+            ActiveNotificationHandler activeEventHandler =
+                    (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+            ActiveEntityEventsListener listener = (ActiveEntityEventsListener) 
activeEventHandler.getListener(entityId);
+            if (listener == null) {
+                // Prepare policy
+                List<IDataset> datasets = new ArrayList<>();
+                for (FeedConnection connection : feedConnections) {
+                    Dataset ds =
+                            
metadataProvider.findDataset(connection.getDataverseName(), 
connection.getDatasetName());
+                    datasets.add(ds);
+                }
+                listener = new FeedEventsListener(this, hcc, entityId, 
datasets, null,
+                        FeedIntakeOperatorNodePushable.class.getSimpleName(), 
NoRetryPolicyFactory.INSTANCE, feed,
+                        feedConnections);
             }
-            org.apache.commons.lang3.tuple.Pair<JobSpecification, 
AlgebricksAbsolutePartitionConstraint> jobInfo =
-                    FeedOperations.buildStartFeedJob(sessionOutput, 
metadataProvider, feed, feedConnections,
-                            compilationProvider, storageComponentProvider, 
qtFactory, hcc);
-
-            JobSpecification feedJob = jobInfo.getLeft();
-            listener = new ActiveEntityEventsListener(appCtx, entityId, 
datasets, jobInfo.getRight(),
-                    FeedIntakeOperatorNodePushable.class.getSimpleName());
-            activeEventHandler.registerListener(listener);
-            IActiveEventSubscriber eventSubscriber = new 
WaitForStateSubscriber(listener, ActivityState.STARTED);
-            
feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, 
entityId);
-            // TODO(Yingyi): currently we do not check IFrameWriter protocol 
violations for Feed jobs.
-            // We will need to design general exception handling mechanism for 
feeds.
-            JobUtils.runJob(hcc, feedJob,
-                    
Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)));
-            eventSubscriber.sync();
-            LOGGER.log(Level.INFO, "Submitted");
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            committed = true;
+            listener.start(metadataProvider);
         } catch (Exception e) {
-            abort(e, e, mdTxnCtx);
-            if (listener != null) {
-                activeEventHandler.unregisterListener(listener);
+            if (!committed) {
+                abort(e, e, mdTxnCtx);
             }
             throw e;
         } finally {
@@ -2090,32 +2065,18 @@
         StopFeedStatement sfst = (StopFeedStatement) stmt;
         String dataverseName = getActiveDataverse(sfst.getDataverseName());
         String feedName = sfst.getFeedName().getValue();
-        EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName, 
feedName);
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
+        EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, 
feedName);
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
         // Obtain runtime info from ActiveListener
-        ActiveEntityEventsListener listener =
-                (ActiveEntityEventsListener) 
activeEventHandler.getActiveEntityListener(feedId);
+        ActiveEntityEventsListener listener = (ActiveEntityEventsListener) 
activeEventHandler.getListener(entityId);
         if (listener == null) {
             throw new AlgebricksException("Feed " + feedName + " is not 
started.");
         }
-        IActiveEventSubscriber eventSubscriber = new 
WaitForStateSubscriber(listener, ActivityState.STOPPED);
-        // Transaction
-        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        
MetadataLockManager.INSTANCE.stopFeedBegin(metadataProvider.getLocks(), 
dataverseName, feedName);
+        
MetadataLockManager.INSTANCE.stopFeedBegin(metadataProvider.getLocks(), 
entityId.getDataverse(),
+                entityId.getEntityName());
         try {
-            // validate
-            FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName, 
mdTxnCtx);
-            // Construct ActiveMessage
-            for (int i = 0; i < listener.getLocations().getLocations().length; 
i++) {
-                String intakeLocation = 
listener.getLocations().getLocations()[i];
-                FeedOperations.SendStopMessageToNode(appCtx, feedId, 
intakeLocation, i);
-            }
-            eventSubscriber.sync();
-        } catch (Exception e) {
-            abort(e, e, mdTxnCtx);
-            throw e;
+            listener.stop(metadataProvider);
         } finally {
             metadataProvider.getLocks().unlock();
         }
@@ -2131,10 +2092,9 @@
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         // Check whether feed is alive
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
-        if (activeEventHandler
-                .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME, 
dataverseName, feedName)) != null) {
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+        if (activeEventHandler.getListener(new EntityId(Feed.EXTENSION_NAME, 
dataverseName, feedName)) != null) {
             throw new 
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, 
feedName);
         }
         // Transaction handling
@@ -2185,21 +2145,26 @@
         String feedName = cfs.getFeedName().getValue();
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        ActiveLifecycleListener activeListener = (ActiveLifecycleListener) 
appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeEventHandler = 
activeListener.getNotificationHandler();
-        // Check whether feed is alive
-        if (activeEventHandler
-                .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME, 
dataverseName, feedName)) != null) {
-            throw new 
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, 
feedName);
-        }
         
MetadataLockManager.INSTANCE.disconnectFeedBegin(metadataProvider.getLocks(), 
dataverseName,
                 dataverseName + "." + datasetName, dataverseName + "." + 
cfs.getFeedName());
         try {
+            ActiveNotificationHandler activeEventHandler =
+                    (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+            // Check whether feed is alive
+            IActiveEntityEventsListener listener =
+                    activeEventHandler.getListener(new 
EntityId(Feed.EXTENSION_NAME, dataverseName, feedName));
+            if (listener != null && listener.isActive()) {
+                throw new 
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED, 
feedName);
+            }
             FeedMetadataUtil.validateIfDatasetExists(metadataProvider, 
dataverseName, cfs.getDatasetName().getValue(),
                     mdTxnCtx);
             FeedMetadataUtil.validateIfFeedExists(dataverseName, 
cfs.getFeedName().getValue(), mdTxnCtx);
             FeedConnection fc = 
MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(),
                     dataverseName, feedName, datasetName);
+            Dataset ds = metadataProvider.findDataset(dataverseName, 
datasetName);
+            if (ds == null) {
+                throw new CompilationException("Dataset " + dataverseName + 
"." + datasetName + " doesn't exist");
+            }
             if (fc == null) {
                 throw new CompilationException("Feed " + feedName + " is 
currently not connected to "
                         + cfs.getDatasetName().getValue() + ". Invalid 
operation!");
@@ -2211,6 +2176,9 @@
                 MetadataManager.INSTANCE.updateFunction(mdTxnCtx, function);
             }
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            if (listener != null) {
+                listener.remove(ds);
+            }
         } catch (Exception e) {
             abort(e, e, mdTxnCtx);
             throw e;
@@ -2952,4 +2920,14 @@
         IStatementRewriter rewriter = 
rewriterFactory.createStatementRewriter();
         rewriter.rewrite(stmt);
     }
+
+    @Override
+    public IStorageComponentProvider getComponentProvider() {
+        return componentProvider;
+    }
+
+    @Override
+    public ICcApplicationContext getAppCtx() {
+        return appCtx;
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index cd9138a..c1def23 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -30,7 +30,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
 import org.apache.asterix.api.http.ctx.StatementExecutorContext;
 import org.apache.asterix.api.http.server.ApiServlet;
 import org.apache.asterix.api.http.server.ClusterApiServlet;
@@ -134,7 +134,7 @@
         statementExecutorCtx = new StatementExecutorContext();
         appCtx = new CcApplicationContext(ccServiceCtx, getHcc(), 
libraryManager, resourceIdManager,
                 () -> MetadataManager.INSTANCE, 
GlobalRecoveryManager.instance(), ftStrategy,
-                new ActiveLifecycleListener(), componentProvider);
+                new ActiveNotificationHandler(), componentProvider);
         ClusterStateManager.INSTANCE.setCcAppCtx(appCtx);
         ccExtensionManager = new CCExtensionManager(getExtensions());
         appCtx.setExtensionManager(ccExtensionManager);
@@ -147,7 +147,7 @@
         
setAsterixStateProxy(AsterixStateProxy.registerRemoteObject(metadataProperties.getMetadataCallbackPort()));
         ccServiceCtx.setDistributedState(proxy);
         MetadataManager.initialize(proxy, metadataProperties);
-        
ccServiceCtx.addJobLifecycleListener(appCtx.getActiveLifecycleListener());
+        
ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler());
 
         // create event loop groups
         webManager = new WebManager();
@@ -178,7 +178,7 @@
 
     @Override
     public void stop() throws Exception {
-        ((ActiveLifecycleListener) appCtx.getActiveLifecycleListener()).stop();
+        ((ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler()).stop();
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Stopping Asterix cluster controller");
         }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 09c4983..df4825a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -30,9 +30,9 @@
 
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter.AdapterType;
 import org.apache.asterix.active.message.ActiveManagerMessage;
-import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import 
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -40,8 +40,6 @@
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedConnectionRequest;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -50,6 +48,9 @@
 import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
 import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
+import org.apache.asterix.external.provider.AdapterFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
@@ -57,19 +58,23 @@
 import org.apache.asterix.lang.common.statement.DataverseDecl;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.DatasourceAdapter;
 import org.apache.asterix.metadata.entities.Feed;
 import org.apache.asterix.metadata.entities.FeedConnection;
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
 import org.apache.asterix.metadata.feeds.LocationConstraint;
+import org.apache.asterix.metadata.utils.MetadataConstants;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import 
org.apache.asterix.runtime.job.listener.MultiTransactionJobletEventListenerFactory;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
 import org.apache.asterix.translator.CompiledStatements;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionOutput;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -109,6 +114,55 @@
 public class FeedOperations {
 
     private FeedOperations() {
+    }
+
+    public static AlgebricksAbsolutePartitionConstraint getFeedLocations(Feed 
feed, ICcApplicationContext appCtx,
+            MetadataTransactionContext mdTxnCtx) throws Exception {
+        return getFeedFactory(feed, appCtx, mdTxnCtx).getPartitionConstraint();
+    }
+
+    public static IAdapterFactory getFeedFactory(Feed feed, 
ICcApplicationContext appCtx,
+            MetadataTransactionContext mdTxnCtx) throws Exception {
+        String adapterName = feed.getAdapterName();
+        Map<String, String> configuration = feed.getAdapterConfiguration();
+        ARecordType adapterOutputType = FeedMetadataUtil.getOutputType(feed, 
feed.getAdapterConfiguration(),
+                ExternalDataConstants.KEY_TYPE_NAME);
+        ARecordType metaType =
+                FeedMetadataUtil.getOutputType(feed, configuration, 
ExternalDataConstants.KEY_META_TYPE_NAME);
+        ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(), 
feed.getFeedName());
+        // Get adapter from metadata dataset <Metadata dataverse>
+        DatasourceAdapter adapterEntity =
+                MetadataManager.INSTANCE.getAdapter(mdTxnCtx, 
MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+        // Get adapter from metadata dataset <The feed dataverse>
+        if (adapterEntity == null) {
+            adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, 
feed.getDataverseName(), adapterName);
+        }
+        IAdapterFactory adapterFactory;
+        if (adapterEntity != null) {
+            AdapterType adapterType = adapterEntity.getType();
+            String adapterFactoryClassname = adapterEntity.getClassname();
+            switch (adapterType) {
+                case INTERNAL:
+                    adapterFactory = (IAdapterFactory) 
Class.forName(adapterFactoryClassname).newInstance();
+                    break;
+                case EXTERNAL:
+                    String[] anameComponents = adapterName.split("#");
+                    String libraryName = anameComponents[0];
+                    ClassLoader cl =
+                            
appCtx.getLibraryManager().getLibraryClassLoader(feed.getDataverseName(), 
libraryName);
+                    adapterFactory = (IAdapterFactory) 
cl.loadClass(adapterFactoryClassname).newInstance();
+                    break;
+                default:
+                    throw new AsterixException("Unknown Adapter type " + 
adapterType);
+            }
+            adapterFactory.setOutputType(adapterOutputType);
+            adapterFactory.setMetaType(metaType);
+            adapterFactory.configure(appCtx.getServiceContext(), 
configuration);
+        } else {
+            adapterFactory = 
AdapterFactoryProvider.getAdapterFactory(appCtx.getServiceContext(), 
adapterName,
+                    configuration, adapterOutputType, metaType);
+        }
+        return adapterFactory;
     }
 
     private static Pair<JobSpecification, IAdapterFactory> 
buildFeedIntakeJobSpec(Feed feed,
@@ -151,9 +205,8 @@
         return spec;
     }
 
-    private static JobSpecification getConnectionJob(SessionOutput 
sessionOutput, MetadataProvider metadataProvider,
-            FeedConnection feedConnection, String[] locations, 
ILangCompilationProvider compilationProvider,
-            IStorageComponentProvider storageComponentProvider, 
DefaultStatementExecutorFactory qtFactory,
+    private static JobSpecification getConnectionJob(IStatementExecutor 
statementExecutor,
+            MetadataProvider metadataProvider, FeedConnection feedConnection, 
String[] locations,
             IHyracksClientConnection hcc) throws AlgebricksException, 
RemoteException, ACIDException {
         DataverseDecl dataverseDecl = new DataverseDecl(new 
Identifier(feedConnection.getDataverseName()));
         FeedConnectionRequest fcr =
@@ -164,8 +217,6 @@
         List<Statement> statements = new ArrayList<>();
         statements.add(dataverseDecl);
         statements.add(subscribeStmt);
-        IStatementExecutor translator = 
qtFactory.create(metadataProvider.getApplicationContext(), statements,
-                sessionOutput, compilationProvider, storageComponentProvider);
         // configure the metadata provider
         
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" + 
Boolean.TRUE);
         metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME, 
"" + subscribeStmt.getPolicy());
@@ -174,7 +225,7 @@
 
         CompiledStatements.CompiledSubscribeFeedStatement csfs = new 
CompiledStatements.CompiledSubscribeFeedStatement(
                 subscribeStmt.getSubscriptionRequest(), 
subscribeStmt.getVarCounter());
-        return translator.rewriteCompileQuery(hcc, metadataProvider, 
subscribeStmt.getQuery(), csfs);
+        return statementExecutor.rewriteCompileQuery(hcc, metadataProvider, 
subscribeStmt.getQuery(), csfs);
     }
 
     private static JobSpecification combineIntakeCollectJobs(MetadataProvider 
metadataProvider, Feed feed,
@@ -272,8 +323,8 @@
             }
 
             // make connections between operators
-            for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, 
Integer>,
-                    Pair<IOperatorDescriptor, Integer>>> entry : 
subJob.getConnectorOperatorMap().entrySet()) {
+            for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, 
Integer>, Pair<IOperatorDescriptor, Integer>>> entry : subJob
+                    .getConnectorOperatorMap().entrySet()) {
                 ConnectorDescriptorId newId = 
connectorIdMapping.get(entry.getKey());
                 IConnectorDescriptor connDesc = 
jobSpec.getConnectorMap().get(newId);
                 Pair<IOperatorDescriptor, Integer> leftOp = 
entry.getValue().getLeft();
@@ -356,10 +407,8 @@
     }
 
     public static Pair<JobSpecification, 
AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
-            SessionOutput sessionOutput, MetadataProvider metadataProvider, 
Feed feed,
-            List<FeedConnection> feedConnections, ILangCompilationProvider 
compilationProvider,
-            IStorageComponentProvider storageComponentProvider, 
DefaultStatementExecutorFactory qtFactory,
-            IHyracksClientConnection hcc) throws Exception {
+            IStatementExecutor statementExecutor, MetadataProvider 
metadataProvider, Feed feed,
+            List<FeedConnection> feedConnections, IHyracksClientConnection 
hcc) throws Exception {
         FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>());
         // TODO: Change the default Datasource to use all possible partitions
         Pair<JobSpecification, IAdapterFactory> intakeInfo = 
buildFeedIntakeJobSpec(feed, metadataProvider, fpa);
@@ -371,8 +420,8 @@
         String[] ingestionLocations = 
ingestionAdaptorFactory.getPartitionConstraint().getLocations();
         // Add connection job
         for (FeedConnection feedConnection : feedConnections) {
-            JobSpecification connectionJob = getConnectionJob(sessionOutput, 
metadataProvider, feedConnection,
-                    ingestionLocations, compilationProvider, 
storageComponentProvider, qtFactory, hcc);
+            JobSpecification connectionJob =
+                    getConnectionJob(statementExecutor, metadataProvider, 
feedConnection, ingestionLocations, hcc);
             jobsList.add(connectionJob);
         }
         return Pair.of(combineIntakeCollectJobs(metadataProvider, feed, 
intakeJob, jobsList, feedConnections,
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 956d111..a9e269a 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -21,23 +21,28 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveRuntime;
+import org.apache.asterix.active.NoRetryPolicyFactory;
 import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.algebra.base.ILangExtension.Language;
+import org.apache.asterix.app.active.ActiveEntityEventsListener;
+import org.apache.asterix.app.cc.CCExtensionManager;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.metadata.IDataset;
-import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
 import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
 import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.SessionOutput;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
@@ -71,8 +76,8 @@
         String requestedStats;
         CcApplicationContext appCtx =
                 (CcApplicationContext) 
ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
-        ActiveLifecycleListener activeLifecycleListener = 
(ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
-        ActiveJobNotificationHandler activeJobNotificationHandler = 
activeLifecycleListener.getNotificationHandler();
+        ActiveNotificationHandler activeJobNotificationHandler =
+                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
         JobId jobId = new JobId(1);
 
         // Mock ActiveRuntime
@@ -82,13 +87,18 @@
 
         // Mock JobSpecification
         JobSpecification jobSpec = Mockito.mock(JobSpecification.class);
-        
Mockito.when(jobSpec.getProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME))
-                .thenReturn(entityId);
+        
Mockito.when(jobSpec.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)).thenReturn(entityId);
 
+        // Mock MetadataProvider
+        CCExtensionManager extensionManager = (CCExtensionManager) 
appCtx.getExtensionManager();
+        IStatementExecutor statementExecutor = extensionManager
+                
.getStatementExecutorFactory(appCtx.getServiceContext().getControllerService().getExecutor())
+                .create(appCtx, Collections.emptyList(), 
Mockito.mock(SessionOutput.class),
+                        
extensionManager.getCompilationProvider(Language.SQLPP), 
appCtx.getStorageComponentProvider());
         // Add event listener
-        ActiveEntityEventsListener eventsListener = new 
ActiveEntityEventsListener(appCtx, entityId, datasetList,
-                partitionConstraint, 
FeedIntakeOperatorNodePushable.class.getSimpleName());
-        activeJobNotificationHandler.registerListener(eventsListener);
+        ActiveEntityEventsListener eventsListener =
+                new ActiveEntityEventsListener(statementExecutor, null, 
entityId, datasetList, partitionConstraint,
+                        FeedIntakeOperatorNodePushable.class.getSimpleName(), 
NoRetryPolicyFactory.INSTANCE);
 
         // Register mock runtime
         NCAppRuntimeContext nc1AppCtx =
@@ -107,14 +117,14 @@
         eventsListener.subscribe(startingSubscriber);
         // Update stats of created/started job without joined partition
         activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
-        activeLifecycleListener.notifyJobStart(jobId);
+        activeJobNotificationHandler.notifyJobStart(jobId);
         startingSubscriber.sync();
         eventsListener.refreshStats(1000);
         requestedStats = eventsListener.getStats();
         Assert.assertTrue(requestedStats.contains("N/A"));
 
         // Fake partition message and notify eventListener
-        WaitForStateSubscriber startedSubscriber = new 
WaitForStateSubscriber(eventsListener, ActivityState.STARTED);
+        WaitForStateSubscriber startedSubscriber = new 
WaitForStateSubscriber(eventsListener, ActivityState.RUNNING);
         eventsListener.subscribe(startedSubscriber);
         ActivePartitionMessage partitionMessage = new 
ActivePartitionMessage(activeRuntimeId, jobId,
                 ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 249bd56..8e84782 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -63,9 +63,9 @@
     IFaultToleranceStrategy getFaultToleranceStrategy();
 
     /**
-     * @return the active lifecycle listener at Cluster controller
+     * @return the active notification handler at Cluster controller
      */
-    IJobLifecycleListener getActiveLifecycleListener();
+    IJobLifecycleListener getActiveNotificationHandler();
 
     /**
      * @return a new instance of {@link IHyracksClientConnection}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
index a0e6e71..48507a7 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
@@ -32,4 +32,13 @@
      */
     int[] getPrimaryBloomFilterFields();
 
+    /**
+     * @return the dataverse name
+     */
+    String getDataverseName();
+
+    /**
+     * @return the dataset name
+     */
+    String getDatasetName();
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 2eb81d4..f29a5e5 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -23,13 +23,13 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.api.IDataParserFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.IIndexibleExternalDataSource;
 import org.apache.asterix.external.api.IIndexingAdapterFactory;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
index 37cc1cf..d142cba 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
@@ -20,6 +20,7 @@
 
 import java.util.List;
 
+import org.apache.asterix.active.IAdapterFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
 
 public interface IIndexingAdapterFactory extends IAdapterFactory {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index e6d81d3..8904a18 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.external.dataset.adapter;
 
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.active.IDataSourceAdapter;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 0681d71..f3c3726 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.external.dataset.adapter;
 
+import org.apache.asterix.active.IDataSourceAdapter;
 import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
index 822d725..d351c51 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
@@ -20,37 +20,41 @@
 
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.IActiveEventSubscriber;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public abstract class AbstractSubscriber implements IActiveEventSubscriber {
 
     protected final IActiveEntityEventsListener listener;
-    private boolean done = false;
+    private volatile boolean done = false;
+    private volatile Exception failure = null;
 
     public AbstractSubscriber(IActiveEntityEventsListener listener) {
         this.listener = listener;
     }
 
     @Override
-    public synchronized boolean isDone() {
+    public boolean isDone() {
         return done;
     }
 
-    public synchronized void complete() throws HyracksDataException {
-        done = true;
-        notifyAll();
-    }
-
-    @Override
-    public synchronized void sync() throws InterruptedException {
-        while (!done) {
-            wait();
+    public void complete(Exception failure) {
+        synchronized (listener) {
+            done = true;
+            if (failure != null) {
+                this.failure = failure;
+            }
+            listener.notifyAll();
         }
     }
 
     @Override
-    public synchronized void unsubscribe() {
-        done = true;
-        notifyAll();
+    public void sync() throws Exception {
+        synchronized (listener) {
+            while (!done) {
+                if (failure != null) {
+                    throw failure;
+                }
+                listener.wait();
+            }
+        }
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
index 42f7a74..90cf45f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
@@ -49,11 +49,6 @@
     }
 
     @Override
-    public void unsubscribe() {
-        // no op
-    }
-
-    @Override
     public void subscribed(IActiveEntityEventsListener eventsListener) throws 
HyracksDataException {
         // no op
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
index fa2fa7f..804fbd8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
@@ -32,7 +32,17 @@
     @Override
     public void notify(ActiveEvent event) throws HyracksDataException {
         if (event.getEventKind() == ActiveEvent.Kind.STATS_UPDATED) {
-            complete();
+            try {
+                complete(null);
+            } catch (Exception e) {
+                throw HyracksDataException.create(e);
+            }
+        } else if (event.getEventKind() == ActiveEvent.Kind.FALURE) {
+            try {
+                complete((Exception) event.getEventObject());
+            } catch (Exception e) {
+                throw HyracksDataException.create(e);
+            }
         }
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
index 7bab421..6ae62ae 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
@@ -39,7 +39,13 @@
     @Override
     public void notify(ActiveEvent event) throws HyracksDataException {
         if (listener.getState() == targetState) {
-            complete();
+            complete(null);
+        } else if (event.getEventKind() == ActiveEvent.Kind.FALURE) {
+            try {
+                complete((Exception) event.getEventObject());
+            } catch (Exception e) {
+                throw HyracksDataException.create(e);
+            }
         }
     }
 
@@ -49,7 +55,7 @@
             throw new 
RuntimeDataException(ErrorCode.CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY);
         }
         if (listener.getState() == targetState) {
-            complete();
+            complete(null);
         }
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
index 93acb26..7ff429d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.external.operators;
 
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 3a06a2b..f38a005 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -22,11 +22,11 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IAdapterFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.om.types.ARecordType;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 8c6a420..4ece727 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -21,9 +21,9 @@
 import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
 import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IAdapterFactory;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index d6ac5d1..aecb790 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -21,9 +21,9 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.active.IAdapterFactory;
 import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
 import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IIndexingAdapterFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 1c28940..72efc95 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -22,10 +22,10 @@
 import java.io.InputStream;
 import java.util.Map;
 
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
 import org.apache.asterix.external.api.ITupleForwarder;
 import org.apache.asterix.external.parser.ADMDataParser;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
index c84a5bd..08ca03b 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
@@ -57,8 +57,7 @@
     // Key is dataverse name. Key of value map is dataset name.
     protected final Map<String, Map<String, Dataset>> datasets = new 
HashMap<>();
     // Key is dataverse name. Key of value map is dataset name. Key of value 
map of value map is index name.
-    protected final Map<String, Map<String, Map<String, Index>>> indexes =
-            new HashMap<>();
+    protected final Map<String, Map<String, Map<String, Index>>> indexes = new 
HashMap<>();
     // Key is dataverse name. Key of value map is datatype name.
     protected final Map<String, Map<String, Datatype>> datatypes = new 
HashMap<>();
     // Key is dataverse name.
@@ -66,19 +65,16 @@
     // Key is function Identifier . Key of value map is function name.
     protected final Map<FunctionSignature, Function> functions = new 
HashMap<>();
     // Key is adapter dataverse. Key of value map is the adapter name
-    protected final Map<String, Map<String, DatasourceAdapter>> adapters =
-            new HashMap<>();
+    protected final Map<String, Map<String, DatasourceAdapter>> adapters = new 
HashMap<>();
 
     // Key is DataverseName, Key of the value map is the Policy name
-    protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies =
-            new HashMap<>();
+    protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies = 
new HashMap<>();
     // Key is library dataverse. Key of value map is the library name
     protected final Map<String, Map<String, Library>> libraries = new 
HashMap<>();
     // Key is library dataverse. Key of value map is the feed name
     protected final Map<String, Map<String, Feed>> feeds = new HashMap<>();
     // Key is DataverseName, Key of the value map is the Policy name
-    protected final Map<String, Map<String, CompactionPolicy>> 
compactionPolicies =
-            new HashMap<>();
+    protected final Map<String, Map<String, CompactionPolicy>> 
compactionPolicies = new HashMap<>();
     // Key is DataverseName, Key of value map is feedConnectionId
     protected final Map<String, Map<String, FeedConnection>> feedConnections = 
new HashMap<>();
 
@@ -247,8 +243,7 @@
                                             
datatypes.remove(dataverse.getDataverseName());
                                             
adapters.remove(dataverse.getDataverseName());
                                             
compactionPolicies.remove(dataverse.getDataverseName());
-                                            List<FunctionSignature> 
markedFunctionsForRemoval =
-                                                    new ArrayList<>();
+                                            List<FunctionSignature> 
markedFunctionsForRemoval = new ArrayList<>();
                                             for (FunctionSignature signature : 
functions.keySet()) {
                                                 if 
(signature.getNamespace().equals(dataverse.getDataverseName())) {
                                                     
markedFunctionsForRemoval.add(signature);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 93b19f1..b35fff7 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -26,6 +26,8 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ClusterProperties;
@@ -39,8 +41,6 @@
 import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.metadata.IDatasetDetails;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index e2b1761..e626892 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -20,8 +20,8 @@
 
 import java.util.List;
 
+import org.apache.asterix.active.IAdapterFactory;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.entities.Dataset;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index b13f4c2..18799a5 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -23,8 +23,8 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.asterix.active.IAdapterFactory;
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.om.types.ARecordType;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 3b70ea9..9960645 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -28,6 +28,8 @@
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -38,8 +40,6 @@
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
@@ -397,23 +397,23 @@
     }
 
     public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, 
IAdapterFactory> buildFeedIntakeRuntime(
-            JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor 
policyAccessor) throws Exception {
+            JobSpecification jobSpec, Feed feed, FeedPolicyAccessor 
policyAccessor) throws Exception {
         Triple<IAdapterFactory, RecordDescriptor, 
IDataSourceAdapter.AdapterType> factoryOutput;
-        factoryOutput = 
FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor, 
mdTxnCtx,
+        factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(feed, 
policyAccessor, mdTxnCtx,
                 getApplicationContext());
-        ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed, 
primaryFeed.getAdapterConfiguration(),
+        ARecordType recordType = FeedMetadataUtil.getOutputType(feed, 
feed.getAdapterConfiguration(),
                 ExternalDataConstants.KEY_TYPE_NAME);
         IAdapterFactory adapterFactory = factoryOutput.first;
         FeedIntakeOperatorDescriptor feedIngestor = null;
         switch (factoryOutput.third) {
             case INTERNAL:
-                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, 
primaryFeed, adapterFactory, recordType,
+                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, 
adapterFactory, recordType,
                         policyAccessor, factoryOutput.second);
                 break;
             case EXTERNAL:
-                String libraryName = primaryFeed.getAdapterName().trim()
+                String libraryName = feed.getAdapterName().trim()
                         
.split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
-                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, 
primaryFeed, libraryName,
+                feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, 
libraryName,
                         adapterFactory.getClass().getName(), recordType, 
policyAccessor, factoryOutput.second);
                 break;
             default:
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 9131692..2383eb6 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -27,7 +27,7 @@
 import java.util.logging.Logger;
 import java.util.stream.IntStream;
 
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
@@ -179,8 +179,8 @@
     public Dataset(Dataset dataset, boolean forRebalance, String 
targetNodeGroupName) {
         this(dataset.dataverseName, dataset.datasetName, 
dataset.recordTypeDataverseName, dataset.recordTypeName,
                 dataset.metaTypeDataverseName, dataset.metaTypeName, 
targetNodeGroupName,
-                dataset.compactionPolicyFactory,
-                dataset.compactionPolicyProperties, dataset.datasetDetails, 
dataset.hints, dataset.datasetType,
+                dataset.compactionPolicyFactory, 
dataset.compactionPolicyProperties, dataset.datasetDetails,
+                dataset.hints, dataset.datasetType,
                 forRebalance ? 
DatasetIdFactory.generateAlternatingDatasetId(dataset.datasetId) : 
dataset.datasetId,
                 dataset.pendingOp, forRebalance ? dataset.rebalanceCount + 1 : 
dataset.rebalanceCount);
     }
@@ -206,10 +206,12 @@
         this.rebalanceCount = rebalanceCount;
     }
 
+    @Override
     public String getDataverseName() {
         return dataverseName;
     }
 
+    @Override
     public String getDatasetName() {
         return datasetName;
     }
@@ -325,9 +327,9 @@
         Map<FeedConnectionId, Pair<JobSpecification, Boolean>> 
disconnectJobList = new HashMap<>();
         if (getDatasetType() == DatasetType.INTERNAL) {
             // prepare job spec(s) that would disconnect any active feeds 
involving the dataset.
-            ActiveLifecycleListener activeListener =
-                    (ActiveLifecycleListener) 
metadataProvider.getApplicationContext().getActiveLifecycleListener();
-            IActiveEntityEventsListener[] activeListeners = 
activeListener.getNotificationHandler().getEventListeners();
+            ActiveNotificationHandler activeListener =
+                    (ActiveNotificationHandler) 
metadataProvider.getApplicationContext().getActiveNotificationHandler();
+            IActiveEntityEventsListener[] activeListeners = 
activeListener.getEventListeners();
             for (IActiveEntityEventsListener listener : activeListeners) {
                 if (listener.isEntityUsingDataset(this)) {
                     throw new 
CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET,
@@ -706,8 +708,8 @@
     public RecordDescriptor getPrimaryRecordDescriptor(MetadataProvider 
metadataProvider) throws AlgebricksException {
         List<List<String>> partitioningKeys = getPrimaryKeys();
         int numPrimaryKeys = partitioningKeys.size();
-        ISerializerDeserializer[] primaryRecFields = new 
ISerializerDeserializer[numPrimaryKeys + 1
-                + (hasMetaPart() ? 1 : 0)];
+        ISerializerDeserializer[] primaryRecFields =
+                new ISerializerDeserializer[numPrimaryKeys + 1 + 
(hasMetaPart() ? 1 : 0)];
         ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + 
(hasMetaPart() ? 1 : 0)];
         ISerializerDeserializerProvider serdeProvider = 
metadataProvider.getFormat().getSerdeProvider();
         List<Integer> indicators = null;
@@ -719,9 +721,9 @@
 
         // Set the serde/traits for primary keys
         for (int i = 0; i < numPrimaryKeys; i++) {
-            IAType keyType = (indicators == null || indicators.get(i) == 0)
-                    ? itemType.getSubFieldType(partitioningKeys.get(i))
-                    : metaType.getSubFieldType(partitioningKeys.get(i));
+            IAType keyType =
+                    (indicators == null || indicators.get(i) == 0) ? 
itemType.getSubFieldType(partitioningKeys.get(i))
+                            : 
metaType.getSubFieldType(partitioningKeys.get(i));
             primaryRecFields[i] = 
serdeProvider.getSerializerDeserializer(keyType);
             primaryTypeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
         }
@@ -731,8 +733,8 @@
         primaryTypeTraits[numPrimaryKeys] = 
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
         if (hasMetaPart()) {
             // Set the serde and traits for the meta record field
-            primaryRecFields[numPrimaryKeys + 1] = 
SerializerDeserializerProvider.INSTANCE
-                    .getSerializerDeserializer(metaType);
+            primaryRecFields[numPrimaryKeys + 1] =
+                    
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
             primaryTypeTraits[numPrimaryKeys + 1] = 
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
         }
         return new RecordDescriptor(primaryRecFields, primaryTypeTraits);
@@ -758,9 +760,9 @@
             indicators = ((InternalDatasetDetails) 
getDatasetDetails()).getKeySourceIndicator();
         }
         for (int i = 0; i < numPrimaryKeys; i++) {
-            IAType keyType = (indicators == null || indicators.get(i) == 0)
-                    ? recordType.getSubFieldType(partitioningKeys.get(i))
-                    : metaType.getSubFieldType(partitioningKeys.get(i));
+            IAType keyType =
+                    (indicators == null || indicators.get(i) == 0) ? 
recordType.getSubFieldType(partitioningKeys.get(i))
+                            : 
metaType.getSubFieldType(partitioningKeys.get(i));
             cmpFactories[i] = 
cmpFactoryProvider.getBinaryComparatorFactory(keyType, true);
         }
         return cmpFactories;
@@ -803,8 +805,8 @@
 
     // Gets an array of partition numbers for this dataset.
     protected int[] getDatasetPartitions(MetadataProvider metadataProvider) 
throws AlgebricksException {
-        FileSplit[] splitsForDataset = 
metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this,
-                getDatasetName());
+        FileSplit[] splitsForDataset =
+                
metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this, 
getDatasetName());
         return IntStream.range(0, splitsForDataset.length).toArray();
     }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
index 5a85327..f263703 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.metadata.entities;
 
-import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType;
+import org.apache.asterix.active.IDataSourceAdapter.AdapterType;
 import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
 import org.apache.asterix.metadata.MetadataCache;
 import org.apache.asterix.metadata.api.IMetadataEntity;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
index 7f4e28d..8a7a9b8 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
@@ -24,7 +24,7 @@
 import java.io.DataInputStream;
 import java.util.Calendar;
 
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.active.IDataSourceAdapter;
 import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.metadata.MetadataException;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 94b36ed..d1001030 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -21,15 +21,15 @@
 import java.rmi.RemoteException;
 import java.util.Map;
 
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter;
+import org.apache.asterix.active.IDataSourceAdapter.AdapterType;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
-import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType;
 import org.apache.asterix.external.feed.api.IFeed;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.provider.AdapterFactoryProvider;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
index 43d72e3..393af7b 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
@@ -266,13 +266,16 @@
         acquireDataverseReadLock(locks, dataverseName);
     }
 
-    public void startFeedBegin(LockList locks, String dataverseName, String 
feedName,
-            List<FeedConnection> feedConnections) throws AsterixException {
+    public void startFeedBegin(LockList locks, String dataverseName, String 
feedName) throws AsterixException {
         acquireDataverseReadLock(locks, dataverseName);
         acquireFeedReadLock(locks, feedName);
+
+    }
+
+    public void lockFeedConnections(LockList locks, List<FeedConnection> 
feedConnections) throws AsterixException {
         for (FeedConnection feedConnection : feedConnections) {
             // what if the dataset is in a different dataverse
-            String fqName = dataverseName + "." + 
feedConnection.getDatasetName();
+            String fqName = feedConnection.getDataverseName() + "." + 
feedConnection.getDatasetName();
             acquireDatasetReadLock(locks, fqName);
         }
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
index 5bb0aa9..fb655eb 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
@@ -28,11 +28,11 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.active.IAdapterFactory;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.TransactionState;
 import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import 
org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor;
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 4cd243c..d43af8c 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -210,7 +210,7 @@
     }
 
     @Override
-    public IJobLifecycleListener getActiveLifecycleListener() {
+    public IJobLifecycleListener getActiveNotificationHandler() {
         return activeLifeCycleListener;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
index 30ffebe..1506872 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
@@ -18,12 +18,14 @@
  */
 package org.apache.hyracks.api.job;
 
+import java.util.List;
+
 import org.apache.hyracks.api.exceptions.HyracksException;
 
 public interface IJobLifecycleListener {
-    public void notifyJobCreation(JobId jobId, JobSpecification spec) throws 
HyracksException;
+    void notifyJobCreation(JobId jobId, JobSpecification spec) throws 
HyracksException;
 
-    public void notifyJobStart(JobId jobId) throws HyracksException;
+    void notifyJobStart(JobId jobId) throws HyracksException;
 
-    public void notifyJobFinish(JobId jobId) throws HyracksException;
+    void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> 
exceptions) throws HyracksException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index 5075081..26245e1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -36,6 +36,7 @@
 import org.apache.hyracks.api.job.IJobLifecycleListener;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.application.ServiceContext;
@@ -88,14 +89,14 @@
         }
     }
 
-    public synchronized void notifyJobFinish(JobId jobId) throws 
HyracksException {
+    public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions)
+            throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
-            l.notifyJobFinish(jobId);
+            l.notifyJobFinish(jobId, jobStatus, exceptions);
         }
     }
 
-    public synchronized void notifyJobCreation(JobId jobId, JobSpecification 
spec)
-            throws HyracksException {
+    public synchronized void notifyJobCreation(JobId jobId, JobSpecification 
spec) throws HyracksException {
         for (IJobLifecycleListener l : jobLifecycleListeners) {
             l.notifyJobCreation(jobId, spec);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 3cc41c5..32403ce 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -41,6 +41,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.cc.PreDistributedJobStore;
 import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
 import org.apache.hyracks.control.common.work.IResultCallback;
@@ -69,7 +70,7 @@
         this.resultTTL = resultTTL;
         this.resultSweepThreshold = resultSweepThreshold;
         this.preDistributedJobStore = preDistributedJobStore;
-        jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
+        jobResultLocations = new LinkedHashMap<>();
     }
 
     @Override
@@ -94,7 +95,7 @@
     }
 
     @Override
-    public void notifyJobFinish(JobId jobId) throws HyracksException {
+    public void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions) throws HyracksException {
         // Auto-generated method stub
     }
 
@@ -189,7 +190,7 @@
 
     @Override
     public synchronized long getResultTimestamp(JobId jobId) {
-        if (preDistributedJobStore.jobIsPredistributed(jobId)){
+        if (preDistributedJobStore.jobIsPredistributed(jobId)) {
             return -1;
         }
         return getState(jobId).getTimestamp();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 45c7711..c1a7899 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -132,8 +132,8 @@
         // Removes a pending job.
         JobRun jobRun = jobQueue.remove(jobId);
         if (jobRun != null) {
-            List<Exception> exceptions = Collections
-                    
.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
+            List<Exception> exceptions =
+                    
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, 
jobId));
             // Since the job has not been executed, we only need to update its 
status and lifecyle here.
             jobRun.setStatus(JobStatus.FAILURE, exceptions);
             runMapArchive.put(jobId, jobRun);
@@ -179,7 +179,7 @@
                 } catch (Exception e) {
                     LOGGER.log(Level.SEVERE, e.getMessage(), e);
                     if (caughtException == null) {
-                        caughtException = new HyracksException(e);
+                        caughtException = HyracksException.create(e);
                     } else {
                         caughtException.addSuppressed(e);
                     }
@@ -208,7 +208,7 @@
         CCServiceContext serviceCtx = ccs.getContext();
         if (serviceCtx != null) {
             try {
-                serviceCtx.notifyJobFinish(jobId);
+                serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), 
run.getPendingExceptions());
             } catch (HyracksException e) {
                 LOGGER.log(Level.SEVERE, e.getMessage(), e);
                 caughtException = e;
@@ -248,8 +248,6 @@
             throw caughtException;
         }
     }
-
-
 
     @Override
     public Collection<JobRun> getRunningJobs() {
@@ -320,9 +318,8 @@
         try {
             run.getExecutor().startJob();
         } catch (Exception e) {
-            ccs.getWorkQueue()
-                    .schedule(new JobCleanupWork(ccs.getJobManager(), 
run.getJobId(), JobStatus.FAILURE,
-                            Collections.singletonList(e)));
+            ccs.getWorkQueue().schedule(new 
JobCleanupWork(ccs.getJobManager(), run.getJobId(), JobStatus.FAILURE,
+                    Collections.singletonList(e)));
         }
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1875
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifeac8c73e6bad39a13663b84a52121356e3c6b40
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>

Reply via email to