abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/792

Change subject: Asterix-1389 Fix Deadlocks in Feed Connections
......................................................................

Asterix-1389 Fix Deadlocks in Feed Connections

This change ensures the completion of the Feed connect and feed
disconnect statement each as an atomic operation.
Previously, we assumed that with the intake ready on all nodes
and the connect started, the connect is complete. That is not
true. In order for the connect to be complete, we need to ensure
that the connect subscribe to the intake in all intake nodes.
Likewise, the disconnect shouldn't return until the connect
job terminates.

Change-Id: Ib2778b4d7f156c7e06ac9f561a26783c4933a22c
---
M .gitignore
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
M asterixdb/asterix-app/src/test/resources/runtimets/repeatedtestsuite.xml
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
R 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
13 files changed, 444 insertions(+), 430 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/92/792/1

diff --git a/.gitignore b/.gitignore
index 8530a4b..82433c3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,18 +12,18 @@
 asterix_logs
 build
 bin
-asterix-app/src/test/resources/externallib/
-asterix-app/rttest/
-asterix-app/mdtest/
-asterix-app/opttest/
-asterix-app/parserts/
-asterix-app/library/
-asterix-app/opt_parserts/
-asterix-app/runtime_parserts/
-asterix-app/data/csv/beer.csv
-asterix-installer/ittest/
-asterix-installer/repliationtest/
-asterix-installer/src/test/resources/clusterts/asterix-installer-0.8.9-SNAPSHOT-binary-assembly/
+asterixdb/asterix-app/src/test/resources/externallib/
+asterixdb/asterix-app/rttest/
+asterixdb/asterix-app/mdtest/
+asterixdb/asterix-app/opttest/
+asterixdb/asterix-app/parserts/
+asterixdb/asterix-app/library/
+asterixdb/asterix-app/opt_parserts/
+asterixdb/asterix-app/runtime_parserts/
+asterixdb/asterix-app/data/csv/beer.csv
+asterixdb/asterix-installer/ittest/
+asterixdb/asterix-installer/repliationtest/
+asterixdb/asterix-installer/src/test/resources/clusterts/asterix-installer-0.8.9-SNAPSHOT-binary-assembly/
 build
 asterix_logs
 bin/
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
index 04f20fb..7018390 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java
@@ -81,7 +81,6 @@
 
     private final LinkedBlockingQueue<FeedEvent> inbox;
     private final Map<FeedConnectionId, List<IFeedLifecycleEventSubscriber>> 
eventSubscribers;
-
     private final Map<JobId, FeedJobInfo> jobInfos;
     private final Map<FeedId, FeedIntakeInfo> intakeJobInfos;
     private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos;
@@ -100,7 +99,9 @@
 
     @Override
     public void run() {
-        FeedEvent event;
+        FeedEvent event = null;
+        Thread.currentThread().setName("FeedJobNotificationHandler");
+        LinkedBlockingQueue<FeedEvent> processed = new LinkedBlockingQueue<>();
         while (true) {
             try {
                 event = inbox.take();
@@ -111,8 +112,8 @@
                     case JOB_FINISH:
                         handleJobFinishEvent(event);
                         break;
-                    case PROVIDER_READY:
-                        handleProviderReady(event);
+                    case PARTITION_START:
+                        handlePartitionStart(event);
                         break;
                     default:
                         LOGGER.log(Level.WARNING, "Unknown Feed Event");
@@ -120,6 +121,8 @@
                 }
             } catch (Exception e) {
                 e.printStackTrace();
+            } finally {
+                processed.add(event);
             }
 
         }
@@ -162,12 +165,11 @@
 
         if (feedJointsOnPipeline == null) {
             feedJointsOnPipeline = new Pair<FeedOperationCounter, 
List<IFeedJoint>>(
-                    new FeedOperationCounter(numOfPrividers, 1), new 
ArrayList<IFeedJoint>());
+                    new FeedOperationCounter(numOfPrividers), new 
ArrayList<IFeedJoint>());
             feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline);
             feedJointsOnPipeline.second.add(feedJoint);
         } else {
             if (!feedJointsOnPipeline.second.contains(feedJoint)) {
-                
feedJointsOnPipeline.first.setJobsCount(feedJointsOnPipeline.first.getJobsCount()
 + 1);
                 feedJointsOnPipeline.second.add(feedJoint);
             } else {
                 throw new IllegalArgumentException("Feed joint " + feedJoint + 
" already registered");
@@ -175,7 +177,7 @@
         }
     }
 
-    public void registerFeedIntakeJob(FeedId feedId, JobId jobId, 
JobSpecification jobSpec)
+    public synchronized void registerFeedIntakeJob(FeedId feedId, JobId jobId, 
JobSpecification jobSpec)
             throws HyracksDataException {
         if (jobInfos.get(jobId) != null) {
             throw new IllegalStateException("Feed job already registered");
@@ -207,7 +209,7 @@
         }
     }
 
-    public void registerFeedCollectionJob(FeedId sourceFeedId, 
FeedConnectionId connectionId, JobId jobId,
+    public synchronized void registerFeedCollectionJob(FeedId sourceFeedId, 
FeedConnectionId connectionId, JobId jobId,
             JobSpecification jobSpec, Map<String, String> feedPolicy) {
         if (jobInfos.get(jobId) != null) {
             throw new IllegalStateException("Feed job already registered");
@@ -242,7 +244,7 @@
 
     }
 
-    public void deregisterFeedIntakeJob(JobId jobId) {
+    public synchronized void deregisterFeedIntakeJob(JobId jobId) {
         if (jobInfos.get(jobId) == null) {
             throw new IllegalStateException(" Feed Intake job not registered 
");
         }
@@ -266,7 +268,7 @@
 
     }
 
-    private void handleJobStartEvent(FeedEvent message) throws Exception {
+    private synchronized void handleJobStartEvent(FeedEvent message) throws 
Exception {
         FeedJobInfo jobInfo = jobInfos.get(message.jobId);
         switch (jobInfo.getJobType()) {
             case INTAKE:
@@ -279,7 +281,7 @@
 
     }
 
-    private void handleJobFinishEvent(FeedEvent message) throws Exception {
+    private synchronized void handleJobFinishEvent(FeedEvent message) throws 
Exception {
         FeedJobInfo jobInfo = jobInfos.get(message.jobId);
         switch (jobInfo.getJobType()) {
             case INTAKE:
@@ -297,14 +299,27 @@
         }
     }
 
-    private void handleProviderReady(FeedEvent message) {
-        FeedIntakeInfo jobInfo = (FeedIntakeInfo) jobInfos.get(message.jobId);
-        Pair<FeedOperationCounter, List<IFeedJoint>> feedCounter = 
feedPipeline.get(message.feedId);
-        
feedCounter.first.setProvidersCount(feedCounter.first.getProvidersCount() - 1);;
-        if (feedCounter.first.getProvidersCount() == 0) {
-            jobInfo.getIntakeFeedJoint().setState(State.ACTIVE);
-            jobInfo.setState(FeedJobState.ACTIVE);
-            notifyFeedEventSubscribers(jobInfo, 
FeedLifecycleEvent.FEED_INTAKE_STARTED);
+    private synchronized void handlePartitionStart(FeedEvent message) {
+        FeedJobInfo jobInfo = jobInfos.get(message.jobId);
+        switch (jobInfo.getJobType()) {
+            case FEED_CONNECT:
+                ((FeedConnectJobInfo) jobInfo).partitionStart();
+                if (((FeedConnectJobInfo) jobInfo).collectionStarted()) {
+                    notifyFeedEventSubscribers(jobInfo, 
FeedLifecycleEvent.FEED_COLLECT_STARTED);
+                }
+                break;
+            case INTAKE:
+                Pair<FeedOperationCounter, List<IFeedJoint>> feedCounter = 
feedPipeline.get(message.feedId);
+                
feedCounter.first.setPartitionCount(feedCounter.first.getPartitionCount() - 1);;
+                if (feedCounter.first.getPartitionCount() == 0) {
+                    ((FeedIntakeInfo) 
jobInfo).getIntakeFeedJoint().setState(State.ACTIVE);
+                    jobInfo.setState(FeedJobState.ACTIVE);
+                    notifyFeedEventSubscribers(jobInfo, 
FeedLifecycleEvent.FEED_INTAKE_STARTED);
+                }
+                break;
+            default:
+                break;
+
         }
     }
 
@@ -335,9 +350,7 @@
     private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws 
RemoteException, ACIDException {
         // set locations of feed sub-operations (intake, compute, store)
         setLocations(cInfo);
-
         Pair<FeedOperationCounter, List<IFeedJoint>> pair = 
feedPipeline.get(cInfo.getConnectionId().getFeedId());
-        pair.first.setJobsCount(pair.first.getJobsCount() + 1);
         // activate joints
         List<IFeedJoint> joints = pair.second;
         for (IFeedJoint joint : joints) {
@@ -351,8 +364,6 @@
         cInfo.setState(FeedJobState.ACTIVE);
         // register activity in metadata
         registerFeedActivity(cInfo);
-        // notify event listeners
-        notifyFeedEventSubscribers(cInfo, 
FeedLifecycleEvent.FEED_COLLECT_STARTED);
     }
 
     private void notifyFeedEventSubscribers(FeedJobInfo jobInfo, 
FeedLifecycleEvent event) {
@@ -375,6 +386,9 @@
                 for (IFeedLifecycleEventSubscriber subscriber : subscribers) {
                     subscriber.handleFeedEvent(event);
                 }
+            }
+            if (event == FeedLifecycleEvent.FEED_COLLECT_ENDED) {
+                eventSubscribers.remove(connId);
             }
         }
     }
@@ -416,12 +430,17 @@
         return activeConnections;
     }
 
-    public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
+    public synchronized boolean isFeedConnectionActive(FeedConnectionId 
connectionId,
+            IFeedLifecycleEventSubscriber eventSubscriber) {
+        boolean active = false;
         FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
         if (cInfo != null) {
-            return cInfo.getState().equals(FeedJobState.ACTIVE);
+            active = cInfo.getState().equals(FeedJobState.ACTIVE);
         }
-        return false;
+        if (active) {
+            registerFeedEventSubscriber(connectionId, eventSubscriber);
+        }
+        return active;
     }
 
     public void setJobState(FeedConnectionId connectionId, FeedJobState 
jobState) {
@@ -433,28 +452,26 @@
         return connectJobInfos.get(connectionId).getState();
     }
 
-    private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, 
FeedEvent message) throws Exception {
+    private synchronized void handleFeedIntakeJobFinishMessage(FeedIntakeInfo 
intakeInfo, FeedEvent message)
+            throws Exception {
         IHyracksClientConnection hcc = 
AsterixAppContextInfo.getInstance().getHcc();
         JobInfo info = hcc.getJobInfo(message.jobId);
         JobStatus status = info.getStatus();
         FeedId feedId = intakeInfo.getFeedId();
         Pair<FeedOperationCounter, List<IFeedJoint>> pair = 
feedPipeline.get(feedId);
-        pair.first.setJobsCount(pair.first.getJobsCount() - 1);
         if (status.equals(JobStatus.FAILURE)) {
             pair.first.setFailedIngestion(true);
         }
         // remove feed joints
         deregisterFeedIntakeJob(message.jobId);
-
         // notify event listeners
-        if (pair.first.getJobsCount() == 0) {
-            feedPipeline.remove(feedId);
-            notifyFeedEventSubscribers(intakeInfo, 
pair.first.isFailedIngestion()
-                    ? FeedLifecycleEvent.FEED_INTAKE_FAILURE : 
FeedLifecycleEvent.FEED_ENDED);
-        }
+        feedPipeline.remove(feedId);
+        intakeJobInfos.remove(feedId);
+        notifyFeedEventSubscribers(intakeInfo, pair.first.isFailedIngestion() 
? FeedLifecycleEvent.FEED_INTAKE_FAILURE
+                : FeedLifecycleEvent.FEED_INTAKE_ENDED);
     }
 
-    private void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) 
throws Exception {
+    private synchronized void 
handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception {
         FeedConnectionId connectionId = cInfo.getConnectionId();
 
         IHyracksClientConnection hcc = 
AsterixAppContextInfo.getInstance().getHcc();
@@ -462,8 +479,6 @@
         JobStatus status = info.getStatus();
         boolean failure = status != null && status.equals(JobStatus.FAILURE);
         FeedPolicyAccessor fpa = new FeedPolicyAccessor(cInfo.getFeedPolicy());
-
-        boolean removeJobHistory = !failure;
         boolean retainSubsription = 
cInfo.getState().equals(FeedJobState.UNDER_RECOVERY)
                 || (failure && fpa.continueOnHardwareFailure());
 
@@ -477,24 +492,13 @@
             removeFeedJointsPostPipelineTermination(cInfo.getConnectionId());
         }
 
-        if (removeJobHistory) {
-            connectJobInfos.remove(connectionId);
-            jobInfos.remove(cInfo.getJobId());
-            feedIntakeProgressTrackers.remove(cInfo.getConnectionId());
-        }
+        connectJobInfos.remove(connectionId);
+        jobInfos.remove(cInfo.getJobId());
+        feedIntakeProgressTrackers.remove(cInfo.getConnectionId());
         deregisterFeedActivity(cInfo);
-
-        Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline
-                .get(cInfo.getSourceFeedJoint().getFeedJointKey().getFeedId());
-        pair.first.setJobsCount(pair.first.getJobsCount() - 1);
-        if (pair.first.getJobsCount() == 0) {
-            notifyFeedEventSubscribers(pair.first.getFeedJobInfo(), 
pair.first.isFailedIngestion()
-                    ? FeedLifecycleEvent.FEED_INTAKE_FAILURE : 
FeedLifecycleEvent.FEED_ENDED);
-            
feedPipeline.remove(cInfo.getSourceFeedJoint().getFeedJointKey().getFeedId());
-        }
-
         // notify event listeners
-        FeedLifecycleEvent event = failure ? 
FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_ENDED;
+        FeedLifecycleEvent event = failure ? 
FeedLifecycleEvent.FEED_COLLECT_FAILURE
+                : FeedLifecycleEvent.FEED_COLLECT_ENDED;
         notifyFeedEventSubscribers(cInfo, event);
     }
 
@@ -548,20 +552,6 @@
     }
 
     public void removeFeedJointsPostPipelineTermination(FeedConnectionId 
connectionId) {
-        FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId);
-        List<IFeedJoint> feedJoints = 
feedPipeline.get(connectionId.getFeedId()).second;
-
-        IFeedJoint sourceJoint = cInfo.getSourceFeedJoint();
-        List<FeedConnectionId> all = sourceJoint.getReceivers();
-        boolean removeSourceJoint = all.size() < 2;
-        if (removeSourceJoint) {
-            feedJoints.remove(sourceJoint);
-        }
-
-        IFeedJoint computeJoint = cInfo.getComputeFeedJoint();
-        if (computeJoint != null && computeJoint.getReceivers().size() < 2) {
-            feedJoints.remove(computeJoint);
-        }
     }
 
     public boolean isRegisteredFeedJob(JobId jobId) {
@@ -594,13 +584,15 @@
         return connectJobInfos.get(connectionId).getJobId();
     }
 
-    public void registerFeedEventSubscriber(FeedConnectionId connectionId, 
IFeedLifecycleEventSubscriber subscriber) {
+    public boolean registerFeedEventSubscriber(FeedConnectionId connectionId,
+            IFeedLifecycleEventSubscriber subscriber) {
         List<IFeedLifecycleEventSubscriber> subscribers = 
eventSubscribers.get(connectionId);
         if (subscribers == null) {
             subscribers = new ArrayList<IFeedLifecycleEventSubscriber>();
             eventSubscribers.put(connectionId, subscribers);
         }
         subscribers.add(subscriber);
+        return true;
     }
 
     public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, 
IFeedLifecycleEventSubscriber subscriber) {
@@ -610,7 +602,7 @@
         }
     }
 
-    //============================
+    // ============================
 
     public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
         List<IFeedJoint> joints = 
feedPipeline.containsKey(feedJointKey.getFeedId())
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
index d7129b8..56d7540 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java
@@ -112,14 +112,16 @@
     }
 
     @Override
-    public void notifyJobStart(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobStart(JobId jobId) throws 
HyracksException {
+        System.err.println(jobId + " started");
         if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
             jobEventInbox.add(new FeedEvent(jobId, 
FeedEvent.EventKind.JOB_START));
         }
     }
 
     @Override
-    public void notifyJobFinish(JobId jobId) throws HyracksException {
+    public synchronized void notifyJobFinish(JobId jobId) throws 
HyracksException {
+        System.err.println(jobId + " finished");
         if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) {
             jobEventInbox.add(new FeedEvent(jobId, 
FeedEvent.EventKind.JOB_FINISH));
         } else {
@@ -151,6 +153,7 @@
      */
     @Override
     public void notifyJobCreation(JobId jobId, 
IActivityClusterGraphGeneratorFactory acggf) throws HyracksException {
+        System.err.println(jobId + " has been created");
         JobSpecification spec = acggf.getJobSpecification();
         FeedConnectionId feedConnectionId = null;
         Map<String, String> feedPolicy = null;
@@ -185,7 +188,7 @@
         public enum EventKind {
             JOB_START,
             JOB_FINISH,
-            PROVIDER_READY
+            PARTITION_START
         }
 
         public EventKind eventKind;
@@ -446,8 +449,9 @@
     }
 
     @Override
-    public boolean isFeedConnectionActive(FeedConnectionId connectionId) {
-        return feedJobNotificationHandler.isFeedConnectionActive(connectionId);
+    public synchronized boolean isFeedConnectionActive(FeedConnectionId 
connectionId,
+            IFeedLifecycleEventSubscriber eventSubscriber) {
+        return feedJobNotificationHandler.isFeedConnectionActive(connectionId, 
eventSubscriber);
     }
 
     public void reportPartialDisconnection(FeedConnectionId connectionId) {
@@ -485,8 +489,9 @@
     }
 
     @Override
-    public void registerFeedEventSubscriber(FeedConnectionId connectionId, 
IFeedLifecycleEventSubscriber subscriber) {
-        feedJobNotificationHandler.registerFeedEventSubscriber(connectionId, 
subscriber);
+    public boolean registerFeedEventSubscriber(FeedConnectionId connectionId,
+            IFeedLifecycleEventSubscriber subscriber) {
+        return 
feedJobNotificationHandler.registerFeedEventSubscriber(connectionId, 
subscriber);
     }
 
     @Override
@@ -503,8 +508,9 @@
         return feedJobNotificationHandler.getFeedCollectJobId(connectionId);
     }
 
-    public void notifyProviderReady(FeedId feedId, JobId jobId) {
-        jobEventInbox.add(new FeedEvent(jobId, 
FeedEvent.EventKind.PROVIDER_READY, feedId));
+    public synchronized void notifyPartitionStart(FeedId feedId, JobId jobId) {
+        System.err.println("Provider for " + jobId + " is reporting");
+        jobEventInbox.add(new FeedEvent(jobId, 
FeedEvent.EventKind.PARTITION_START, feedId));
     }
 
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
index 3786413..ef65abb 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java
@@ -274,150 +274,156 @@
             
metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider);
             metadataProvider.setOutputFile(outputFile);
             metadataProvider.setConfig(config);
-            switch (stmt.getKind()) {
-                case SET: {
-                    handleSetStatement(metadataProvider, stmt, config);
-                    break;
-                }
-                case DATAVERSE_DECL: {
-                    activeDefaultDataverse = 
handleUseDataverseStatement(metadataProvider, stmt);
-                    break;
-                }
-                case CREATE_DATAVERSE: {
-                    handleCreateDataverseStatement(metadataProvider, stmt);
-                    break;
-                }
-                case DATASET_DECL: {
-                    handleCreateDatasetStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case CREATE_INDEX: {
-                    handleCreateIndexStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case TYPE_DECL: {
-                    handleCreateTypeStatement(metadataProvider, stmt);
-                    break;
-                }
-                case NODEGROUP_DECL: {
-                    handleCreateNodeGroupStatement(metadataProvider, stmt);
-                    break;
-                }
-                case DATAVERSE_DROP: {
-                    handleDataverseDropStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case DATASET_DROP: {
-                    handleDatasetDropStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case INDEX_DROP: {
-                    handleIndexDropStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case TYPE_DROP: {
-                    handleTypeDropStatement(metadataProvider, stmt);
-                    break;
-                }
-                case NODEGROUP_DROP: {
-                    handleNodegroupDropStatement(metadataProvider, stmt);
-                    break;
-                }
-
-                case CREATE_FUNCTION: {
-                    handleCreateFunctionStatement(metadataProvider, stmt);
-                    break;
-                }
-
-                case FUNCTION_DROP: {
-                    handleFunctionDropStatement(metadataProvider, stmt);
-                    break;
-                }
-
-                case LOAD: {
-                    handleLoadStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case INSERT:
-                case UPSERT: {
-                    handleInsertUpsertStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-                case DELETE: {
-                    handleDeleteStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case CREATE_PRIMARY_FEED:
-                case CREATE_SECONDARY_FEED: {
-                    handleCreateFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case DROP_FEED: {
-                    handleDropFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case DROP_FEED_POLICY: {
-                    handleDropFeedPolicyStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case CONNECT_FEED: {
-                    handleConnectFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case DISCONNECT_FEED: {
-                    handleDisconnectFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case SUBSCRIBE_FEED: {
-                    handleSubscribeFeedStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case CREATE_FEED_POLICY: {
-                    handleCreateFeedPolicyStatement(metadataProvider, stmt, 
hcc);
-                    break;
-                }
-
-                case QUERY: {
-                    metadataProvider.setResultSetId(new 
ResultSetId(resultSetIdCounter++));
-                    metadataProvider.setResultAsyncMode(
-                            resultDelivery == ResultDelivery.ASYNC || 
resultDelivery == ResultDelivery.ASYNC_DEFERRED);
-                    handleQuery(metadataProvider, (Query) stmt, hcc, hdc, 
resultDelivery, stats);
-                    break;
-                }
-
-                case COMPACT: {
-                    handleCompactStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
-
-                case EXTERNAL_DATASET_REFRESH: {
-                    handleExternalDatasetRefreshStatement(metadataProvider, 
stmt, hcc);
-                    break;
-                }
-
-                case WRITE: {
-                    Pair<IAWriterFactory, FileSplit> result = 
handleWriteStatement(metadataProvider, stmt);
-                    if (result.first != null) {
-                        writerFactory = result.first;
+            String threadNameBefore = Thread.currentThread().getName();
+            Thread.currentThread().setName("QueryTranslator: " + 
stmt.getKind());
+            try {
+                switch (stmt.getKind()) {
+                    case SET: {
+                        handleSetStatement(metadataProvider, stmt, config);
+                        break;
                     }
-                    outputFile = result.second;
-                    break;
-                }
+                    case DATAVERSE_DECL: {
+                        activeDefaultDataverse = 
handleUseDataverseStatement(metadataProvider, stmt);
+                        break;
+                    }
+                    case CREATE_DATAVERSE: {
+                        handleCreateDataverseStatement(metadataProvider, stmt);
+                        break;
+                    }
+                    case DATASET_DECL: {
+                        handleCreateDatasetStatement(metadataProvider, stmt, 
hcc);
+                        break;
+                    }
+                    case CREATE_INDEX: {
+                        handleCreateIndexStatement(metadataProvider, stmt, 
hcc);
+                        break;
+                    }
+                    case TYPE_DECL: {
+                        handleCreateTypeStatement(metadataProvider, stmt);
+                        break;
+                    }
+                    case NODEGROUP_DECL: {
+                        handleCreateNodeGroupStatement(metadataProvider, stmt);
+                        break;
+                    }
+                    case DATAVERSE_DROP: {
+                        handleDataverseDropStatement(metadataProvider, stmt, 
hcc);
+                        break;
+                    }
+                    case DATASET_DROP: {
+                        handleDatasetDropStatement(metadataProvider, stmt, 
hcc);
+                        break;
+                    }
+                    case INDEX_DROP: {
+                        handleIndexDropStatement(metadataProvider, stmt, hcc);
+                        break;
+                    }
+                    case TYPE_DROP: {
+                        handleTypeDropStatement(metadataProvider, stmt);
+                        break;
+                    }
+                    case NODEGROUP_DROP: {
+                        handleNodegroupDropStatement(metadataProvider, stmt);
+                        break;
+                    }
 
-                case RUN: {
-                    handleRunStatement(metadataProvider, stmt, hcc);
-                    break;
-                }
+                    case CREATE_FUNCTION: {
+                        handleCreateFunctionStatement(metadataProvider, stmt);
+                        break;
+                    }
 
-                default:
-                    break;
+                    case FUNCTION_DROP: {
+                        handleFunctionDropStatement(metadataProvider, stmt);
+                        break;
+                    }
+
+                    case LOAD: {
+                        handleLoadStatement(metadataProvider, stmt, hcc);
+                        break;
+                    }
+                    case INSERT:
+                    case UPSERT: {
+                        handleInsertUpsertStatement(metadataProvider, stmt, 
hcc);
+                        break;
+                    }
+                    case DELETE: {
+                        handleDeleteStatement(metadataProvider, stmt, hcc);
+                        break;
+                    }
+
+                    case CREATE_PRIMARY_FEED:
+                    case CREATE_SECONDARY_FEED: {
+                        handleCreateFeedStatement(metadataProvider, stmt, hcc);
+                        break;
+                    }
+
+                    case DROP_FEED: {
+                        handleDropFeedStatement(metadataProvider, stmt, hcc);
+                        break;
+                    }
+
+                    case DROP_FEED_POLICY: {
+                        handleDropFeedPolicyStatement(metadataProvider, stmt, 
hcc);
+                        break;
+                    }
+
+                    case CONNECT_FEED: {
+                        handleConnectFeedStatement(metadataProvider, stmt, 
hcc);
+                        break;
+                    }
+
+                    case DISCONNECT_FEED: {
+                        handleDisconnectFeedStatement(metadataProvider, stmt, 
hcc);
+                        break;
+                    }
+
+                    case SUBSCRIBE_FEED: {
+                        handleSubscribeFeedStatement(metadataProvider, stmt, 
hcc);
+                        break;
+                    }
+
+                    case CREATE_FEED_POLICY: {
+                        handleCreateFeedPolicyStatement(metadataProvider, 
stmt, hcc);
+                        break;
+                    }
+
+                    case QUERY: {
+                        metadataProvider.setResultSetId(new 
ResultSetId(resultSetIdCounter++));
+                        metadataProvider.setResultAsyncMode(resultDelivery == 
ResultDelivery.ASYNC
+                                || resultDelivery == 
ResultDelivery.ASYNC_DEFERRED);
+                        handleQuery(metadataProvider, (Query) stmt, hcc, hdc, 
resultDelivery, stats);
+                        break;
+                    }
+
+                    case COMPACT: {
+                        handleCompactStatement(metadataProvider, stmt, hcc);
+                        break;
+                    }
+
+                    case EXTERNAL_DATASET_REFRESH: {
+                        
handleExternalDatasetRefreshStatement(metadataProvider, stmt, hcc);
+                        break;
+                    }
+
+                    case WRITE: {
+                        Pair<IAWriterFactory, FileSplit> result = 
handleWriteStatement(metadataProvider, stmt);
+                        if (result.first != null) {
+                            writerFactory = result.first;
+                        }
+                        outputFile = result.second;
+                        break;
+                    }
+
+                    case RUN: {
+                        handleRunStatement(metadataProvider, stmt, hcc);
+                        break;
+                    }
+
+                    default:
+                        break;
+                }
+            } finally {
+                Thread.currentThread().setName(threadNameBefore);
             }
         }
     }
@@ -609,9 +615,9 @@
                     }
                     if (compactionPolicy == null) {
                         if (filterField != null) {
-                            //If the dataset has a filter and the user didn't 
specify a merge
-                            //policy, then we will pick the
-                            //correlated-prefix as the default merge policy.
+                            // If the dataset has a filter and the user didn't 
specify a merge
+                            // policy, then we will pick the
+                            // correlated-prefix as the default merge policy.
                             compactionPolicy = 
GlobalConfig.DEFAULT_FILTERED_DATASET_COMPACTION_POLICY_NAME;
                             compactionPolicyProperties = 
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES;
                         }
@@ -632,12 +638,12 @@
 
             }
 
-            //#. initialize DatasetIdFactory if it is not initialized.
+            // #. initialize DatasetIdFactory if it is not initialized.
             if (!DatasetIdFactory.isInitialized()) {
                 
DatasetIdFactory.initialize(MetadataManager.INSTANCE.getMostRecentDatasetId());
             }
 
-            //#. add a new dataset with PendingAddOp
+            // #. add a new dataset with PendingAddOp
             dataset = new Dataset(dataverseName, datasetName, 
itemTypeDataverseName, itemTypeName,
                     metaItemTypeDataverseName, metaItemTypeName, ngName, 
compactionPolicy, compactionPolicyProperties,
                     datasetDetails, dd.getHints(), dsType, 
DatasetIdFactory.generateDatasetId(),
@@ -650,21 +656,21 @@
                 JobSpecification jobSpec = 
DatasetOperations.createDatasetJobSpec(dataverse, datasetName,
                         metadataProvider);
 
-                //#. make metadataTxn commit before calling runJob.
+                // #. make metadataTxn commit before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-                //#. runJob
+                // #. runJob
                 JobUtils.runJob(hcc, jobSpec, true);
 
-                //#. begin new metadataTxn
+                // #. begin new metadataTxn
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
             }
 
-            //#. add a new dataset with PendingNoOp after deleting the dataset 
with PendingAddOp
+            // #. add a new dataset with PendingNoOp after deleting the 
dataset with PendingAddOp
             
MetadataManager.INSTANCE.dropDataset(metadataProvider.getMetadataTxnContext(), 
dataverseName, datasetName);
             dataset.setPendingOp(IMetadataEntity.PENDING_NO_OP);
             
MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), 
dataset);
@@ -676,11 +682,11 @@
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
 
-                //#. execute compensation operations
-                //remove the index in NC
-                //[Notice]
-                //As long as we updated(and committed) metadata, we should 
remove any effect of the job
-                //because an exception occurs during runJob.
+                // #. execute compensation operations
+                // remove the index in NC
+                // [Notice]
+                // As long as we updated(and committed) metadata, we should 
remove any effect of the job
+                // because an exception occurs during runJob.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -697,7 +703,7 @@
                     }
                 }
 
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -820,7 +826,7 @@
         String indexName = null;
         JobSpecification spec = null;
         Dataset ds = null;
-        //For external datasets
+        // For external datasets
         ArrayList<ExternalFile> externalFilesSnapshot = null;
         boolean firstExternalDatasetIndex = false;
         boolean filesIndexReplicated = false;
@@ -904,10 +910,10 @@
                 }
             }
 
-            //Checks whether a user is trying to create an inverted secondary 
index on a dataset
-            //with a variable-length primary key.
-            //Currently, we do not support this. Therefore, as a temporary 
solution, we print an
-            //error message and stop.
+            // Checks whether a user is trying to create an inverted secondary 
index on a dataset
+            // with a variable-length primary key.
+            // Currently, we do not support this. Therefore, as a temporary 
solution, we print an
+            // error message and stop.
             if (stmtCreateIndex.getIndexType() == 
IndexType.SINGLE_PARTITION_WORD_INVIX
                     || stmtCreateIndex.getIndexType() == 
IndexType.SINGLE_PARTITION_NGRAM_INVIX
                     || stmtCreateIndex.getIndexType() == 
IndexType.LENGTH_PARTITIONED_WORD_INVIX
@@ -917,7 +923,7 @@
                     IAType keyType = 
aRecordType.getSubFieldType(partitioningKey);
                     ITypeTraits typeTrait = 
AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
 
-                    //If it is not a fixed length
+                    // If it is not a fixed length
                     if (typeTrait.getFixedLength() < 0) {
                         throw new AlgebricksException("The keyword or ngram 
index -" + indexName
                                 + " cannot be created on the dataset -" + 
datasetName
@@ -930,27 +936,27 @@
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
                 validateIfResourceIsActiveInFeed(dataverseName, datasetName);
             } else {
-                //External dataset
-                //Check if the dataset is indexible
+                // External dataset
+                // Check if the dataset is indexible
                 if 
(!ExternalIndexingOperations.isIndexible((ExternalDatasetDetails) 
ds.getDatasetDetails())) {
                     throw new AlgebricksException(
                             "dataset using " + ((ExternalDatasetDetails) 
ds.getDatasetDetails()).getAdapter()
                                     + " Adapter can't be indexed");
                 }
-                //Check if the name of the index is valid
+                // Check if the name of the index is valid
                 if (!ExternalIndexingOperations.isValidIndexName(datasetName, 
indexName)) {
                     throw new AlgebricksException("external dataset index name 
is invalid");
                 }
 
-                //Check if the files index exist
+                // Check if the files index exist
                 filesIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
                         datasetName, 
ExternalIndexingOperations.getFilesIndexName(datasetName));
                 firstExternalDatasetIndex = (filesIndex == null);
-                //Lock external dataset
+                // Lock external dataset
                 ExternalDatasetsRegistry.INSTANCE.buildIndexBegin(ds, 
firstExternalDatasetIndex);
                 datasetLocked = true;
                 if (firstExternalDatasetIndex) {
-                    //Verify that no one has created an index before we 
acquire the lock
+                    // Verify that no one has created an index before we 
acquire the lock
                     filesIndex = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
                             dataverseName, datasetName, 
ExternalIndexingOperations.getFilesIndexName(datasetName));
                     if (filesIndex != null) {
@@ -960,20 +966,20 @@
                     }
                 }
                 if (firstExternalDatasetIndex) {
-                    //Get snapshot from External File System
+                    // Get snapshot from External File System
                     externalFilesSnapshot = 
ExternalIndexingOperations.getSnapshotFromExternalFileSystem(ds);
-                    //Add an entry for the files index
+                    // Add an entry for the files index
                     filesIndex = new Index(dataverseName, datasetName,
                             
ExternalIndexingOperations.getFilesIndexName(datasetName), IndexType.BTREE,
                             ExternalIndexingOperations.FILE_INDEX_FIELD_NAMES, 
null,
                             ExternalIndexingOperations.FILE_INDEX_FIELD_TYPES, 
false, false,
                             IMetadataEntity.PENDING_ADD_OP);
                     
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
filesIndex);
-                    //Add files to the external files index
+                    // Add files to the external files index
                     for (ExternalFile file : externalFilesSnapshot) {
                         MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, 
file);
                     }
-                    //This is the first index for the external dataset, 
replicate the files index
+                    // This is the first index for the external dataset, 
replicate the files index
                     spec = 
ExternalIndexingOperations.buildFilesIndexReplicationJobSpec(ds, 
externalFilesSnapshot,
                             metadataProvider, true);
                     if (spec == null) {
@@ -985,7 +991,7 @@
                 }
             }
 
-            //check whether there exists another enforced index on the same 
field
+            // check whether there exists another enforced index on the same 
field
             if (stmtCreateIndex.isEnforced()) {
                 List<Index> indexes = MetadataManager.INSTANCE
                         
.getDatasetIndexes(metadataProvider.getMetadataTxnContext(), dataverseName, 
datasetName);
@@ -999,7 +1005,7 @@
                 }
             }
 
-            //#. add a new index with PendingAddOp
+            // #. add a new index with PendingAddOp
             Index index = new Index(dataverseName, datasetName, indexName, 
stmtCreateIndex.getIndexType(), indexFields,
                     keySourceIndicators, indexFieldTypes, 
stmtCreateIndex.getGramLength(), stmtCreateIndex.isEnforced(),
                     false, IMetadataEntity.PENDING_ADD_OP);
@@ -1011,7 +1017,7 @@
                         Lists.newArrayList(index));
             }
 
-            //#. prepare to create the index artifact in NC.
+            // #. prepare to create the index artifact in NC.
             CompiledCreateIndexStatement cis = new 
CompiledCreateIndexStatement(index.getIndexName(), dataverseName,
                     index.getDatasetName(), index.getKeyFieldNames(), 
index.getKeyFieldTypes(),
                     index.isEnforcingKeyFileds(), index.getGramLength(), 
index.getIndexType());
@@ -1025,14 +1031,14 @@
 
             progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-            //#. create the index artifact in NC.
+            // #. create the index artifact in NC.
             JobUtils.runJob(hcc, spec, true);
 
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-            //#. load data into the index in NC.
+            // #. load data into the index in NC.
             cis = new CompiledCreateIndexStatement(index.getIndexName(), 
dataverseName, index.getDatasetName(),
                     index.getKeyFieldNames(), index.getKeyFieldTypes(), 
index.isEnforcingKeyFileds(),
                     index.getGramLength(), index.getIndexType());
@@ -1042,24 +1048,24 @@
 
             JobUtils.runJob(hcc, spec, true);
 
-            //#. begin new metadataTxn
+            // #. begin new metadataTxn
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-            //#. add another new index with PendingNoOp after deleting the 
index with PendingAddOp
+            // #. add another new index with PendingNoOp after deleting the 
index with PendingAddOp
             
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName, datasetName,
                     indexName);
             index.setPendingOp(IMetadataEntity.PENDING_NO_OP);
             
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
index);
-            //add another new files index with PendingNoOp after deleting the 
index with
-            //PendingAddOp
+            // add another new files index with PendingNoOp after deleting the 
index with
+            // PendingAddOp
             if (firstExternalDatasetIndex) {
                 
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName, datasetName,
                         filesIndex.getIndexName());
                 filesIndex.setPendingOp(IMetadataEntity.PENDING_NO_OP);
                 
MetadataManager.INSTANCE.addIndex(metadataProvider.getMetadataTxnContext(), 
filesIndex);
-                //update transaction timestamp
+                // update transaction timestamp
                 ((ExternalDatasetDetails) 
ds.getDatasetDetails()).setRefreshTimestamp(new Date());
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
             }
@@ -1069,7 +1075,7 @@
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
             }
-            //If files index was replicated for external dataset, it should be 
cleaned up on NC side
+            // If files index was replicated for external dataset, it should 
be cleaned up on NC side
             if (filesIndexReplicated) {
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
@@ -1090,8 +1096,8 @@
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //remove the index in NC
+                // #. execute compensation operations
+                // remove the index in NC
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -1114,7 +1120,7 @@
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     metadataProvider.setMetadataTxnContext(mdTxnCtx);
                     try {
-                        //Drop External Files from metadata
+                        // Drop External Files from metadata
                         
MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     } catch (Exception e2) {
@@ -1126,7 +1132,7 @@
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     metadataProvider.setMetadataTxnContext(mdTxnCtx);
                     try {
-                        //Drop the files index from metadata
+                        // Drop the files index from metadata
                         
MetadataManager.INSTANCE.dropIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
                                 datasetName, 
ExternalIndexingOperations.getFilesIndexName(datasetName));
                         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -1138,7 +1144,7 @@
                                 + ") couldn't be removed from the metadata", 
e);
                     }
                 }
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1221,7 +1227,7 @@
                 }
             }
 
-            //# disconnect all feeds from any datasets in the dataverse.
+            // # disconnect all feeds from any datasets in the dataverse.
             List<FeedConnectionId> activeFeedConnections = 
FeedLifecycleListener.INSTANCE
                     .getActiveFeedConnections(null);
             DisconnectFeedStatement disStmt = null;
@@ -1243,13 +1249,13 @@
                                     + connection.getDatasetName() + ". 
Encountered exception " + exception);
                         }
                     }
-                    //prepare job to remove feed log storage
+                    // prepare job to remove feed log storage
                     jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(
                             MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
dataverseName, feedId.getFeedName())));
                 }
             }
 
-            //#. prepare jobs which will drop corresponding datasets with 
indexes.
+            // #. prepare jobs which will drop corresponding datasets with 
indexes.
             List<Dataset> datasets = 
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
             for (int j = 0; j < datasets.size(); j++) {
                 String datasetName = datasets.get(j).getDatasetName();
@@ -1269,7 +1275,7 @@
                     CompiledDatasetDropStatement cds = new 
CompiledDatasetDropStatement(dataverseName, datasetName);
                     
jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, 
metadataProvider));
                 } else {
-                    //External dataset
+                    // External dataset
                     List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
                             datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
@@ -1289,10 +1295,10 @@
                 }
             }
             
jobsToExecute.add(DataverseOperations.createDropDataverseJobSpec(dv, 
metadataProvider));
-            //#. mark PendingDropOp on the dataverse record by
-            //first, deleting the dataverse record from the DATAVERSE_DATASET
-            //second, inserting the dataverse record with the PendingDropOp 
value into the
-            //DATAVERSE_DATASET
+            // #. mark PendingDropOp on the dataverse record by
+            // first, deleting the dataverse record from the DATAVERSE_DATASET
+            // second, inserting the dataverse record with the PendingDropOp 
value into the
+            // DATAVERSE_DATASET
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
             MetadataManager.INSTANCE.addDataverse(mdTxnCtx,
                     new Dataverse(dataverseName, dv.getDataFormat(), 
IMetadataEntity.PENDING_DROP_OP));
@@ -1309,7 +1315,7 @@
             bActiveTxn = true;
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-            //#. finally, delete the dataverse.
+            // #. finally, delete the dataverse.
             MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, dataverseName);
             if (activeDefaultDataverse != null && 
activeDefaultDataverse.getDataverseName() == dataverseName) {
                 activeDefaultDataverse = null;
@@ -1325,18 +1331,18 @@
                     activeDefaultDataverse = null;
                 }
 
-                //#. execute compensation operations
-                //remove the all indexes in NC
+                // #. execute compensation operations
+                // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to 
be compensated.
+                    // do no throw exception since still the metadata needs to 
be compensated.
                     e.addSuppressed(e2);
                 }
 
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 try {
                     MetadataManager.INSTANCE.dropDataverse(mdTxnCtx, 
dataverseName);
@@ -1383,7 +1389,7 @@
 
             Map<FeedConnectionId, Pair<JobSpecification, Boolean>> 
disconnectJobList = new HashMap<FeedConnectionId, Pair<JobSpecification, 
Boolean>>();
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
-                //prepare job spec(s) that would disconnect any active feeds 
involving the dataset.
+                // prepare job spec(s) that would disconnect any active feeds 
involving the dataset.
                 List<FeedConnectionId> feedConnections = 
FeedLifecycleListener.INSTANCE.getActiveFeedConnections(null);
                 if (feedConnections != null && !feedConnections.isEmpty()) {
                     for (FeedConnectionId connection : feedConnections) {
@@ -1394,14 +1400,14 @@
                             LOGGER.info("Disconnecting feed " + 
connection.getFeedId().getFeedName() + " from dataset "
                                     + datasetName + " as dataset is being 
dropped");
                         }
-                        //prepare job to remove feed log storage
+                        // prepare job to remove feed log storage
                         jobsToExecute
                                 
.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE.getFeed(mdTxnCtx,
                                         connection.getFeedId().getDataverse(), 
connection.getFeedId().getFeedName())));
                     }
                 }
 
-                //#. prepare jobs to drop the datatset and the indexes in NC
+                // #. prepare jobs to drop the datatset and the indexes in NC
                 List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, 
datasetName);
                 for (int j = 0; j < indexes.size(); j++) {
                     if (indexes.get(j).isSecondaryIndex()) {
@@ -1413,7 +1419,7 @@
                 CompiledDatasetDropStatement cds = new 
CompiledDatasetDropStatement(dataverseName, datasetName);
                 
jobsToExecute.add(DatasetOperations.createDropDatasetJobSpec(cds, 
metadataProvider));
 
-                //#. mark the existing dataset as PendingDropOp
+                // #. mark the existing dataset as PendingDropOp
                 MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, 
datasetName);
                 MetadataManager.INSTANCE.addDataset(mdTxnCtx,
                         new Dataset(dataverseName, datasetName, 
ds.getItemTypeDataverseName(), ds.getItemTypeName(),
@@ -1426,12 +1432,12 @@
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-                //# disconnect the feeds
+                // # disconnect the feeds
                 for (Pair<JobSpecification, Boolean> p : 
disconnectJobList.values()) {
                     JobUtils.runJob(hcc, p.first, true);
                 }
 
-                //#. run the jobs
+                // #. run the jobs
                 for (JobSpecification jobSpec : jobsToExecute) {
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
@@ -1440,9 +1446,9 @@
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
             } else {
-                //External dataset
+                // External dataset
                 ExternalDatasetsRegistry.INSTANCE.removeDatasetInfo(ds);
-                //#. prepare jobs to drop the datatset and the indexes in NC
+                // #. prepare jobs to drop the datatset and the indexes in NC
                 List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, 
datasetName);
                 for (int j = 0; j < indexes.size(); j++) {
                     if 
(ExternalIndexingOperations.isFileIndex(indexes.get(j))) {
@@ -1457,7 +1463,7 @@
                     }
                 }
 
-                //#. mark the existing dataset as PendingDropOp
+                // #. mark the existing dataset as PendingDropOp
                 MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, 
datasetName);
                 MetadataManager.INSTANCE.addDataset(mdTxnCtx,
                         new Dataset(dataverseName, datasetName, 
ds.getItemTypeDataverseName(), ds.getItemTypeName(),
@@ -1469,7 +1475,7 @@
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
 
-                //#. run the jobs
+                // #. run the jobs
                 for (JobSpecification jobSpec : jobsToExecute) {
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
@@ -1481,9 +1487,9 @@
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
             }
 
-            //#. finally, delete the dataset.
+            // #. finally, delete the dataset.
             MetadataManager.INSTANCE.dropDataset(mdTxnCtx, dataverseName, 
datasetName);
-            //Drop the associated nodegroup
+            // Drop the associated nodegroup
             String nodegroup = ds.getNodeGroupName();
             if 
(!nodegroup.equalsIgnoreCase(MetadataConstants.METADATA_DEFAULT_NODEGROUP_NAME))
 {
                 MetadataManager.INSTANCE.dropNodegroup(mdTxnCtx, dataverseName 
+ ":" + datasetName);
@@ -1496,18 +1502,18 @@
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //remove the all indexes in NC
+                // #. execute compensation operations
+                // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to 
be compensated.
+                    // do no throw exception since still the metadata needs to 
be compensated.
                     e.addSuppressed(e2);
                 }
 
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1542,7 +1548,7 @@
         MetadataLockManager.INSTANCE.dropIndexBegin(dataverseName, 
dataverseName + "." + datasetName);
 
         String indexName = null;
-        //For external index
+        // For external index
         boolean dropFilesIndex = false;
         List<JobSpecification> jobsToExecute = new 
ArrayList<JobSpecification>();
         try {
@@ -1581,18 +1587,18 @@
                         throw new AlgebricksException("There is no index with 
this name " + indexName + ".");
                     }
                 }
-                //#. prepare a job to drop the index in NC.
+                // #. prepare a job to drop the index in NC.
                 CompiledIndexDropStatement cds = new 
CompiledIndexDropStatement(dataverseName, datasetName, indexName);
                 
jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, 
metadataProvider, ds));
 
-                //#. mark PendingDropOp on the existing index
+                // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, 
datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
                         new Index(dataverseName, datasetName, indexName, 
index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldSourceIndicators(), 
index.getKeyFieldTypes(),
                                 index.isEnforcingKeyFileds(), 
index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
 
-                //#. commit the existing transaction before calling runJob.
+                // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
@@ -1601,15 +1607,15 @@
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
 
-                //#. begin a new transaction
+                // #. begin a new transaction
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-                //#. finally, delete the existing index
+                // #. finally, delete the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, 
datasetName, indexName);
             } else {
-                //External dataset
+                // External dataset
                 indexName = stmtIndexDrop.getIndexName().getValue();
                 Index index = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataverseName, datasetName, indexName);
                 if (index == null) {
@@ -1622,21 +1628,21 @@
                 } else if (ExternalIndexingOperations.isFileIndex(index)) {
                     throw new AlgebricksException("Dropping a dataset's files 
index is not allowed.");
                 }
-                //#. prepare a job to drop the index in NC.
+                // #. prepare a job to drop the index in NC.
                 CompiledIndexDropStatement cds = new 
CompiledIndexDropStatement(dataverseName, datasetName, indexName);
                 
jobsToExecute.add(IndexOperations.buildDropSecondaryIndexJobSpec(cds, 
metadataProvider, ds));
                 List<Index> datasetIndexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
                         datasetName);
                 if (datasetIndexes.size() == 2) {
                     dropFilesIndex = true;
-                    //only one index + the files index, we need to delete both 
of the indexes
+                    // only one index + the files index, we need to delete 
both of the indexes
                     for (Index externalIndex : datasetIndexes) {
                         if 
(ExternalIndexingOperations.isFileIndex(externalIndex)) {
                             cds = new 
CompiledIndexDropStatement(dataverseName, datasetName,
                                     externalIndex.getIndexName());
                             jobsToExecute.add(
                                     
ExternalIndexingOperations.buildDropFilesIndexJobSpec(cds, metadataProvider, 
ds));
-                            //#. mark PendingDropOp on the existing files index
+                            // #. mark PendingDropOp on the existing files 
index
                             MetadataManager.INSTANCE.dropIndex(mdTxnCtx, 
dataverseName, datasetName,
                                     externalIndex.getIndexName());
                             MetadataManager.INSTANCE.addIndex(mdTxnCtx,
@@ -1649,14 +1655,14 @@
                     }
                 }
 
-                //#. mark PendingDropOp on the existing index
+                // #. mark PendingDropOp on the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, 
datasetName, indexName);
                 MetadataManager.INSTANCE.addIndex(mdTxnCtx,
                         new Index(dataverseName, datasetName, indexName, 
index.getIndexType(), index.getKeyFieldNames(),
                                 index.getKeyFieldSourceIndicators(), 
index.getKeyFieldTypes(),
                                 index.isEnforcingKeyFileds(), 
index.isPrimaryIndex(), IMetadataEntity.PENDING_DROP_OP));
 
-                //#. commit the existing transaction before calling runJob.
+                // #. commit the existing transaction before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 bActiveTxn = false;
                 progress = ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA;
@@ -1665,15 +1671,15 @@
                     JobUtils.runJob(hcc, jobSpec, true);
                 }
 
-                //#. begin a new transaction
+                // #. begin a new transaction
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
 
-                //#. finally, delete the existing index
+                // #. finally, delete the existing index
                 MetadataManager.INSTANCE.dropIndex(mdTxnCtx, dataverseName, 
datasetName, indexName);
                 if (dropFilesIndex) {
-                    //delete the files index too
+                    // delete the files index too
                     MetadataManager.INSTANCE.dropIndex(mdTxnCtx, 
dataverseName, datasetName,
                             
ExternalIndexingOperations.getFilesIndexName(datasetName));
                     
MetadataManager.INSTANCE.dropDatasetExternalFiles(mdTxnCtx, ds);
@@ -1688,18 +1694,18 @@
             }
 
             if (progress == ProgressState.ADDED_PENDINGOP_RECORD_TO_METADATA) {
-                //#. execute compensation operations
-                //remove the all indexes in NC
+                // #. execute compensation operations
+                // remove the all indexes in NC
                 try {
                     for (JobSpecification jobSpec : jobsToExecute) {
                         JobUtils.runJob(hcc, jobSpec, true);
                     }
                 } catch (Exception e2) {
-                    //do no throw exception since still the metadata needs to 
be compensated.
+                    // do no throw exception since still the metadata needs to 
be compensated.
                     e.addSuppressed(e2);
                 }
 
-                //remove the record from the metadata.
+                // remove the record from the metadata.
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
                 try {
@@ -1953,11 +1959,11 @@
             ICompiledDmlStatement stmt)
             throws AsterixException, RemoteException, AlgebricksException, 
JSONException, ACIDException {
 
-        //Query Rewriting (happens under the same ongoing metadata transaction)
+        // Query Rewriting (happens under the same ongoing metadata 
transaction)
         Pair<Query, Integer> reWrittenQuery = 
apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query,
                 sessionConfig);
 
-        //Query Compilation (happens under the same ongoing metadata 
transaction)
+        // Query Compilation (happens under the same ongoing metadata 
transaction)
         JobSpecification spec = apiFramework.compileQuery(declaredFunctions, 
metadataProvider, reWrittenQuery.first,
                 reWrittenQuery.second, stmt == null ? null : 
stmt.getDatasetName(), sessionConfig, stmt);
 
@@ -2178,7 +2184,8 @@
 
             feedConnId = new FeedConnectionId(dataverseName, 
cfs.getFeedName(), cfs.getDatasetName().getValue());
 
-            if 
(FeedLifecycleListener.INSTANCE.isFeedConnectionActive(feedConnId)) {
+            subscriberRegistered = 
FeedLifecycleListener.INSTANCE.isFeedConnectionActive(feedConnId, 
eventSubscriber);
+            if (subscriberRegistered) {
                 throw new AsterixException("Feed " + cfs.getFeedName() + " is 
already connected to dataset "
                         + cfs.getDatasetName().getValue());
             }
@@ -2186,24 +2193,23 @@
             FeedPolicyEntity feedPolicy = 
FeedMetadataUtil.validateIfPolicyExists(dataverseName, cbfs.getPolicyName(),
                     mdTxnCtx);
 
-            //All Metadata checks have passed. Feed connect request is valid. 
//
+            // All Metadata checks have passed. Feed connect request is valid. 
//
 
             FeedPolicyAccessor policyAccessor = new 
FeedPolicyAccessor(feedPolicy.getProperties());
             Triple<FeedConnectionRequest, Boolean, List<IFeedJoint>> triple = 
getFeedConnectionRequest(dataverseName,
                     feed, cbfs.getDatasetName(), feedPolicy, mdTxnCtx);
             FeedConnectionRequest connectionRequest = triple.first;
             boolean createFeedIntakeJob = triple.second;
-
-            
FeedLifecycleListener.INSTANCE.registerFeedEventSubscriber(feedConnId, 
eventSubscriber);
-            subscriberRegistered = true;
+            subscriberRegistered = 
FeedLifecycleListener.INSTANCE.registerFeedEventSubscriber(feedConnId,
+                    eventSubscriber);
             if (createFeedIntakeJob) {
                 FeedId feedId = 
connectionRequest.getFeedJointKey().getFeedId();
                 Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
feedId.getDataverse(),
                         feedId.getFeedName());
                 Pair<JobSpecification, IAdapterFactory> pair = 
FeedOperations.buildFeedIntakeJobSpec(primaryFeed,
                         metadataProvider, policyAccessor);
-                //adapter configuration are valid at this stage
-                //register the feed joints (these are auto-de-registered)
+                // adapter configuration are valid at this stage
+                // register the feed joints (these are auto-de-registered)
                 int numOfPrividers = 
pair.second.getPartitionConstraint().getLocations().length;
                 for (IFeedJoint fj : triple.third) {
                     FeedLifecycleListener.INSTANCE.registerFeedJoint(fj, 
numOfPrividers);
@@ -2227,7 +2233,7 @@
             bActiveTxn = false;
             
eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_STARTED);
             if 
(Boolean.valueOf(metadataProvider.getConfig().get(ConnectFeedStatement.WAIT_FOR_COMPLETION)))
 {
-                eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_ENDED); // 
blocking call
+                
eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED); // blocking 
call
             }
         } catch (Exception e) {
             if (bActiveTxn) {
@@ -2270,7 +2276,7 @@
         boolean isFeedJointAvailable = 
FeedLifecycleListener.INSTANCE.isFeedJointAvailable(feedJointKey);
         if (!isFeedJointAvailable) {
             sourceFeedJoint = 
FeedLifecycleListener.INSTANCE.getAvailableFeedJoint(feedJointKey);
-            if (sourceFeedJoint == null) { //the feed is currently not being 
ingested, i.e., it is unavailable.
+            if (sourceFeedJoint == null) { // the feed is currently not being 
ingested, i.e., it is unavailable.
                 connectionLocation = FeedRuntimeType.INTAKE;
                 FeedId sourceFeedId = feedJointKey.getFeedId(); // the 
root/primary feedId
                 Feed primaryFeed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, 
dataverse, sourceFeedId.getFeedName());
@@ -2290,8 +2296,8 @@
                     functionsToApply.add(f);
                 }
             }
-            //register the compute feed point that represents the final output 
from the collection of
-            //functions that will be applied.
+            // register the compute feed point that represents the final 
output from the collection of
+            // functions that will be applied.
             if (!functionsToApply.isEmpty()) {
                 FeedJointKey computeFeedJointKey = new 
FeedJointKey(feed.getFeedId(), functionsToApply);
                 IFeedJoint computeFeedJoint = new 
FeedJoint(computeFeedJointKey, feed.getFeedId(),
@@ -2341,7 +2347,6 @@
         DisconnectFeedStatement cfs = (DisconnectFeedStatement) stmt;
         String dataverseName = getActiveDataverse(cfs.getDataverseName());
         String datasetName = cfs.getDatasetName().getValue();
-
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         boolean bActiveTxn = true;
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2350,7 +2355,9 @@
         Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName, 
cfs.getFeedName().getValue(), mdTxnCtx);
 
         FeedConnectionId connectionId = new FeedConnectionId(feed.getFeedId(), 
cfs.getDatasetName().getValue());
-        boolean isFeedConnectionActive = 
FeedLifecycleListener.INSTANCE.isFeedConnectionActive(connectionId);
+        IFeedLifecycleEventSubscriber eventSubscriber = new 
FeedLifecycleEventSubscriber();
+        boolean isFeedConnectionActive = 
FeedLifecycleListener.INSTANCE.isFeedConnectionActive(connectionId,
+                eventSubscriber);
         if (!isFeedConnectionActive) {
             throw new AsterixException("Feed " + 
feed.getFeedId().getFeedName() + " is currently not connected to "
                     + cfs.getDatasetName().getValue() + ". Invalid 
operation!");
@@ -2377,7 +2384,7 @@
                 
CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(connectionId);
                 
FeedLifecycleListener.INSTANCE.reportPartialDisconnection(connectionId);
             }
-
+            eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_COLLECT_ENDED);
         } catch (Exception e) {
             if (bActiveTxn) {
                 abort(e, e, mdTxnCtx);
@@ -2463,7 +2470,7 @@
             Datatype dt = 
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
                     ds.getItemTypeDataverseName(), itemTypeName);
 
-            //Prepare jobs to compact the datatset and its indexes
+            // Prepare jobs to compact the datatset and its indexes
             List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, 
datasetName);
             if (indexes.size() == 0) {
                 throw new AlgebricksException(
@@ -2500,7 +2507,7 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
 
-            //#. run the jobs
+            // #. run the jobs
             for (JobSpecification jobSpec : jobsToExecute) {
                 JobUtils.runJob(hcc, jobSpec, true);
             }
@@ -2548,9 +2555,9 @@
                         ResultReader resultReader = new ResultReader(hcc, hdc);
                         resultReader.open(jobId, 
metadataProvider.getResultSetId());
 
-                        //In this case (the normal case), we don't use the
-                        //"response" JSONObject - just stream the results
-                        //to the "out" PrintWriter
+                        // In this case (the normal case), we don't use the
+                        // "response" JSONObject - just stream the results
+                        // to the "out" PrintWriter
                         if (sessionConfig.fmt() == OutputFormat.CSV
                                 && 
sessionConfig.is(SessionConfig.FORMAT_CSV_HEADER)) {
                             
ResultUtils.displayCSVHeader(metadataProvider.findOutputRecordType(), 
sessionConfig);
@@ -2579,7 +2586,7 @@
             throw e;
         } finally {
             MetadataLockManager.INSTANCE.queryEnd(query.getDataverses(), 
query.getDatasets());
-            //release external datasets' locks acquired during compilation of 
the query
+            // release external datasets' locks acquired during compilation of 
the query
             
ExternalDatasetsRegistry.INSTANCE.releaseAcquiredLocks(metadataProvider);
         }
     }
@@ -2640,56 +2647,56 @@
             ds = 
MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(), 
dataverseName,
                     datasetName);
 
-            //Dataset exists ?
+            // Dataset exists ?
             if (ds == null) {
                 throw new AlgebricksException(
                         "There is no dataset with this name " + datasetName + 
" in dataverse " + dataverseName);
             }
-            //Dataset external ?
+            // Dataset external ?
             if (ds.getDatasetType() != DatasetType.EXTERNAL) {
                 throw new AlgebricksException(
                         "dataset " + datasetName + " in dataverse " + 
dataverseName + " is not an external dataset");
             }
-            //Dataset has indexes ?
+            // Dataset has indexes ?
             indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, 
dataverseName, datasetName);
             if (indexes.size() == 0) {
                 throw new AlgebricksException("External dataset " + 
datasetName + " in dataverse " + dataverseName
                         + " doesn't have any index");
             }
 
-            //Record transaction time
+            // Record transaction time
             Date txnTime = new Date();
 
-            //refresh lock here
+            // refresh lock here
             ExternalDatasetsRegistry.INSTANCE.refreshBegin(ds);
             lockAquired = true;
 
-            //Get internal files
+            // Get internal files
             metadataFiles = 
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, ds);
             deletedFiles = new ArrayList<ExternalFile>();
             addedFiles = new ArrayList<ExternalFile>();
             appendedFiles = new ArrayList<ExternalFile>();
 
-            //Compute delta
-            //Now we compare snapshot with external file system
+            // Compute delta
+            // Now we compare snapshot with external file system
             if (ExternalIndexingOperations.isDatasetUptodate(ds, 
metadataFiles, addedFiles, deletedFiles,
                     appendedFiles)) {
                 ((ExternalDatasetDetails) 
ds.getDatasetDetails()).setRefreshTimestamp(txnTime);
                 MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                //latch will be released in the finally clause
+                // latch will be released in the finally clause
                 return;
             }
 
-            //At this point, we know data has changed in the external file 
system, record
-            //transaction in metadata and start
+            // At this point, we know data has changed in the external file 
system, record
+            // transaction in metadata and start
             transactionDataset = 
ExternalIndexingOperations.createTransactionDataset(ds);
             /*
              * Remove old dataset record and replace it with a new one
              */
             MetadataManager.INSTANCE.updateDataset(mdTxnCtx, 
transactionDataset);
 
-            //Add delta files to the metadata
+            // Add delta files to the metadata
             for (ExternalFile file : addedFiles) {
                 MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
             }
@@ -2700,7 +2707,7 @@
                 MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
             }
 
-            //Create the files index update job
+            // Create the files index update job
             spec = ExternalIndexingOperations.buildFilesIndexUpdateOp(ds, 
metadataFiles, deletedFiles, addedFiles,
                     appendedFiles, metadataProvider);
 
@@ -2708,22 +2715,22 @@
             bActiveTxn = false;
             transactionState = ExternalDatasetTransactionState.BEGIN;
 
-            //run the files update job
+            // run the files update job
             JobUtils.runJob(hcc, spec, true);
 
             for (Index index : indexes) {
                 if (!ExternalIndexingOperations.isFileIndex(index)) {
                     spec = ExternalIndexingOperations.buildIndexUpdateOp(ds, 
index, metadataFiles, deletedFiles,
                             addedFiles, appendedFiles, metadataProvider);
-                    //run the files update job
+                    // run the files update job
                     JobUtils.runJob(hcc, spec, true);
                 }
             }
 
-            //all index updates has completed successfully, record transaction 
state
+            // all index updates has completed successfully, record 
transaction state
             spec = ExternalIndexingOperations.buildCommitJob(ds, indexes, 
metadataProvider);
 
-            //Aquire write latch again -> start a transaction and record the 
decision to commit
+            // Aquire write latch again -> start a transaction and record the 
decision to commit
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             bActiveTxn = true;
@@ -2734,9 +2741,9 @@
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             transactionState = ExternalDatasetTransactionState.READY_TO_COMMIT;
-            //We don't release the latch since this job is expected to be quick
+            // We don't release the latch since this job is expected to be 
quick
             JobUtils.runJob(hcc, spec, true);
-            //Start a new metadata transaction to record the final state of 
the transaction
+            // Start a new metadata transaction to record the final state of 
the transaction
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
             bActiveTxn = true;
@@ -2749,11 +2756,11 @@
                     while (iterator.hasNext()) {
                         ExternalFile appendedFile = iterator.next();
                         if 
(file.getFileName().equals(appendedFile.getFileName())) {
-                            //delete existing file
+                            // delete existing file
                             
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
-                            //delete existing appended file
+                            // delete existing appended file
                             
MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, appendedFile);
-                            //add the original file with appended information
+                            // add the original file with appended information
                             appendedFile.setFileNumber(file.getFileNumber());
                             
appendedFile.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
                             MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, 
appendedFile);
@@ -2763,24 +2770,24 @@
                 }
             }
 
-            //remove the deleted files delta
+            // remove the deleted files delta
             for (ExternalFile file : deletedFiles) {
                 MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
             }
 
-            //insert new files
+            // insert new files
             for (ExternalFile file : addedFiles) {
                 MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
                 file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
                 MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
             }
 
-            //mark the transaction as complete
+            // mark the transaction as complete
             ((ExternalDatasetDetails) transactionDataset.getDatasetDetails())
                     .setState(ExternalDatasetTransactionState.COMMIT);
             MetadataManager.INSTANCE.updateDataset(mdTxnCtx, 
transactionDataset);
 
-            //commit metadata transaction
+            // commit metadata transaction
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             success = true;
         } catch (Exception e) {
@@ -2792,12 +2799,12 @@
                         + datasetName + ") refresh couldn't carry out the 
commit phase", e);
             }
             if (transactionState == ExternalDatasetTransactionState.COMMIT) {
-                //Nothing to do , everything should be clean
+                // Nothing to do , everything should be clean
                 throw e;
             }
             if (transactionState == ExternalDatasetTransactionState.BEGIN) {
-                //transaction failed, need to do the following
-                //clean NCs removing transaction components
+                // transaction failed, need to do the following
+                // clean NCs removing transaction components
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 metadataProvider.setMetadataTxnContext(mdTxnCtx);
@@ -2807,12 +2814,12 @@
                 try {
                     JobUtils.runJob(hcc, spec, true);
                 } catch (Exception e2) {
-                    //This should never happen -- fix throw illegal
+                    // This should never happen -- fix throw illegal
                     e.addSuppressed(e2);
                     throw new IllegalStateException("System is in inconsistent 
state. Failed to abort refresh", e);
                 }
-                //remove the delta of files
-                //return the state of the dataset to committed
+                // remove the delta of files
+                // return the state of the dataset to committed
                 try {
                     mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                     for (ExternalFile file : deletedFiles) {
@@ -2825,7 +2832,7 @@
                         MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, 
file);
                     }
                     MetadataManager.INSTANCE.updateDataset(mdTxnCtx, ds);
-                    //commit metadata transaction
+                    // commit metadata transaction
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                 } catch (Exception e2) {
                     abort(e, e2, mdTxnCtx);
@@ -2878,20 +2885,20 @@
                     datasetNameFrom, datasetNameTo, mdTxnCtx);
 
             String pregelixHomeKey = "PREGELIX_HOME";
-            //Finds PREGELIX_HOME in system environment variables.
+            // Finds PREGELIX_HOME in system environment variables.
             String pregelixHome = System.getenv(pregelixHomeKey);
-            //Finds PREGELIX_HOME in Java properties.
+            // Finds PREGELIX_HOME in Java properties.
             if (pregelixHome == null) {
                 pregelixHome = System.getProperty(pregelixHomeKey);
             }
-            //Finds PREGELIX_HOME in AsterixDB configuration.
+            // Finds PREGELIX_HOME in AsterixDB configuration.
             if (pregelixHome == null) {
-                //Since there is a default value for PREGELIX_HOME in 
AsterixCompilerProperties,
-                //pregelixHome can never be null.
+                // Since there is a default value for PREGELIX_HOME in 
AsterixCompilerProperties,
+                // pregelixHome can never be null.
                 pregelixHome = 
AsterixAppContextInfo.getInstance().getCompilerProperties().getPregelixHome();
             }
 
-            //Constructs the pregelix command line.
+            // Constructs the pregelix command line.
             List<String> cmd = constructPregelixCommand(pregelixStmt, 
dataverseNameFrom, datasetNameFrom,
                     dataverseNameTo, datasetNameTo);
             ProcessBuilder pb = new ProcessBuilder(cmd);
@@ -2900,9 +2907,9 @@
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
-            //Executes the Pregelix command.
+            // Executes the Pregelix command.
             int resultState = executeExternalShellProgram(pb);
-            //Checks the return state of the external Pregelix command.
+            // Checks the return state of the external Pregelix command.
             if (resultState != 0) {
                 throw new AlgebricksException(
                         "Something went wrong executing your Pregelix Job. 
Perhaps the Pregelix cluster needs to be restarted. "
@@ -2920,12 +2927,12 @@
         }
     }
 
-    //Prepares to run a program on external runtime.
+    // Prepares to run a program on external runtime.
     private void prepareRunExternalRuntime(AqlMetadataProvider 
metadataProvider, IHyracksClientConnection hcc,
             RunStatement pregelixStmt, String dataverseNameFrom, String 
dataverseNameTo, String datasetNameFrom,
             String datasetNameTo, MetadataTransactionContext mdTxnCtx)
             throws AlgebricksException, AsterixException, Exception {
-        //Validates the source/sink dataverses and datasets.
+        // Validates the source/sink dataverses and datasets.
         Dataset fromDataset = metadataProvider.findDataset(dataverseNameFrom, 
datasetNameFrom);
         if (fromDataset == null) {
             throw new AsterixException("The source dataset " + datasetNameFrom 
+ " in dataverse " + dataverseNameFrom
@@ -2938,7 +2945,7 @@
         }
 
         try {
-            //Find the primary index of the sink dataset.
+            // Find the primary index of the sink dataset.
             Index toIndex = null;
             List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseNameTo,
                     pregelixStmt.getDatasetNameTo().getValue());
@@ -2951,7 +2958,7 @@
             if (toIndex == null) {
                 throw new AlgebricksException("Tried to access non-existing 
dataset: " + datasetNameTo);
             }
-            //Cleans up the sink dataset -- Drop and then Create.
+            // Cleans up the sink dataset -- Drop and then Create.
             DropStatement dropStmt = new DropStatement(new 
Identifier(dataverseNameTo), pregelixStmt.getDatasetNameTo(),
                     true);
             this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
@@ -2970,12 +2977,12 @@
             throw new AlgebricksException("Error cleaning the result dataset. 
This should not happen.");
         }
 
-        //Flushes source dataset.
+        // Flushes source dataset.
         FlushDatasetUtils.flushDataset(hcc, metadataProvider, mdTxnCtx, 
dataverseNameFrom, datasetNameFrom,
                 datasetNameFrom);
     }
 
-    //Executes external shell commands.
+    // Executes external shell commands.
     private int executeExternalShellProgram(ProcessBuilder pb)
             throws IOException, AlgebricksException, InterruptedException {
         Process process = pb.start();
@@ -3001,15 +3008,15 @@
             }
             process.waitFor();
         }
-        //Gets the exit value of the program.
+        // Gets the exit value of the program.
         int resultState = process.exitValue();
         return resultState;
     }
 
-    //Constructs a Pregelix command line.
+    // Constructs a Pregelix command line.
     private List<String> constructPregelixCommand(RunStatement pregelixStmt, 
String fromDataverseName,
             String fromDatasetName, String toDataverseName, String 
toDatasetName) {
-        //Constructs AsterixDB parameters, e.g., URL, source dataset and sink 
dataset.
+        // Constructs AsterixDB parameters, e.g., URL, source dataset and sink 
dataset.
         AsterixExternalProperties externalProperties = 
AsterixAppContextInfo.getInstance().getExternalProperties();
         AsterixClusterProperties clusterProperties = 
AsterixClusterProperties.INSTANCE;
         String clientIP = 
clusterProperties.getCluster().getMasterNode().getClientIp();
@@ -3024,7 +3031,7 @@
         asterixdbParameterBuilder.append("pregelix.asterixdb.output.dataset=" 
+ toDatasetName + ",");
         
asterixdbParameterBuilder.append("pregelix.asterixdb.output.cleanup=false,");
 
-        //construct command
+        // construct command
         List<String> cmds = new ArrayList<String>();
         cmds.add("bin/pregelix");
         cmds.add(pregelixStmt.getParameters().get(0)); // jar
@@ -3037,7 +3044,7 @@
         String outputConverterClassValue = 
"=org.apache.pregelix.example.converter.VLongIdOutputVertexConverter,";
         boolean custPropAdded = false;
         boolean meetCustProp = false;
-        //User parameters.
+        // User parameters.
         for (String s : pregelixStmt.getParameters().get(2).split(" ")) {
             if (meetCustProp) {
                 if (!s.contains(inputConverterClassKey)) {
@@ -3059,10 +3066,10 @@
 
         if (!custPropAdded) {
             cmds.add(customizedPregelixProperty);
-            //Appends default converter classes to asterixdbParameterBuilder.
+            // Appends default converter classes to asterixdbParameterBuilder.
             asterixdbParameterBuilder.append(inputConverterClassKey + 
inputConverterClassValue);
             asterixdbParameterBuilder.append(outputConverterClassKey + 
outputConverterClassValue);
-            //Remove the last comma.
+            // Remove the last comma.
             
asterixdbParameterBuilder.delete(asterixdbParameterBuilder.length() - 1,
                     asterixdbParameterBuilder.length());
             cmds.add(asterixdbParameterBuilder.toString());
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 2378032..9dd4025 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -37,7 +37,7 @@
 import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.ICCMessageBroker;
-import org.apache.asterix.external.feed.message.FeedProviderReadyMessage;
+import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -91,8 +91,8 @@
     }
 
     private void handleFeedProviderReady(IMessage message) {
-        FeedProviderReadyMessage msg = (FeedProviderReadyMessage) message;
-        FeedLifecycleListener.INSTANCE.notifyProviderReady(msg.getFeedId(), 
msg.getJobId());
+        FeedPartitionStartMessage msg = (FeedPartitionStartMessage) message;
+        FeedLifecycleListener.INSTANCE.notifyPartitionStart(msg.getFeedId(), 
msg.getJobId());
     }
 
     private synchronized void handleResourceIdRequest(IMessage message, String 
nodeId) throws Exception {
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/repeatedtestsuite.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/repeatedtestsuite.xml
index f99bfed..51ebf39 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/repeatedtestsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/repeatedtestsuite.xml
@@ -24,6 +24,11 @@
     <!-- By default no test are listed here. This is only for individual 
testing purposes. -->
     <!-- The following test case is commented out as an example. -->
     <test-group name="repeated-tests">
+        <test-case FilePath="feeds">
+            <compilation-unit name="drop-dataverse-with-disconnected-feed">
+                <output-dir 
compare="Text">drop-dataverse-with-disconnected-feed</output-dir>
+            </compilation-unit>
+        </test-case>
 <!--
         <test-case FilePath="records/get-record-fields">
             <compilation-unit name="tiny-social-example">
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
index 7b74ef9..0dd87d6 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java
@@ -22,29 +22,19 @@
 
 public class FeedOperationCounter {
     private FeedJobInfo feedJobInfo;
-    private int providersCount;
-    private int jobsCount;
+    private int partitionCount;
     private boolean failedIngestion = false;
 
-    public FeedOperationCounter(int providersCount, int jobsCount) {
-        this.providersCount = providersCount;
-        this.jobsCount = jobsCount;
+    public FeedOperationCounter(int partitionCount) {
+        this.partitionCount = partitionCount;
     }
 
-    public int getProvidersCount() {
-        return providersCount;
+    public int getPartitionCount() {
+        return partitionCount;
     }
 
-    public void setProvidersCount(int providersCount) {
-        this.providersCount = providersCount;
-    }
-
-    public int getJobsCount() {
-        return jobsCount;
-    }
-
-    public void setJobsCount(int jobsCount) {
-        this.jobsCount = jobsCount;
+    public void setPartitionCount(int partitionCount) {
+        this.partitionCount = partitionCount;
     }
 
     public boolean isFailedIngestion() {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
index 0c8724e..ad3c1c9 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleEventSubscriber.java
@@ -27,7 +27,8 @@
         FEED_COLLECT_STARTED,
         FEED_INTAKE_FAILURE,
         FEED_COLLECT_FAILURE,
-        FEED_ENDED
+        FEED_INTAKE_ENDED,
+        FEED_COLLECT_ENDED
     }
 
     public void assertEvent(FeedLifecycleEvent event) throws AsterixException, 
InterruptedException;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
index 28b713e..fcd3cd8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/IFeedLifecycleListener.java
@@ -39,12 +39,12 @@
 
     public List<String> getStoreLocations(FeedConnectionId feedId);
 
-    public void registerFeedEventSubscriber(FeedConnectionId connectionId, 
IFeedLifecycleEventSubscriber subscriber);
+    public boolean registerFeedEventSubscriber(FeedConnectionId connectionId, 
IFeedLifecycleEventSubscriber subscriber);
 
     public void deregisterFeedEventSubscriber(FeedConnectionId connectionId, 
IFeedLifecycleEventSubscriber subscriber);
 
     public List<String> getCollectLocations(FeedConnectionId feedConnectionId);
 
-    boolean isFeedConnectionActive(FeedConnectionId connectionId);
+    boolean isFeedConnectionActive(FeedConnectionId connectionId, 
IFeedLifecycleEventSubscriber eventSubscriber);
 
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
similarity index 90%
rename from 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
rename to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
index 4c81c5b..49b23ed 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedPartitionStartMessage.java
@@ -22,13 +22,13 @@
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.hyracks.api.job.JobId;
 
-public class FeedProviderReadyMessage extends AbstractApplicationMessage {
+public class FeedPartitionStartMessage extends AbstractApplicationMessage {
 
     private static final long serialVersionUID = 1L;
     private final FeedId feedId;
     private final JobId jobId;
 
-    public FeedProviderReadyMessage(FeedId feedId, JobId jobId) {
+    public FeedPartitionStartMessage(FeedId feedId, JobId jobId) {
         this.feedId = feedId;
         this.jobId = jobId;
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
index 3e42169..b69a7b3 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedConnectJobInfo.java
@@ -36,6 +36,7 @@
     private List<String> collectLocations;
     private List<String> computeLocations;
     private List<String> storageLocations;
+    private int partitionStarts = 0;
 
     public FeedConnectJobInfo(JobId jobId, FeedJobState state, 
FeedConnectionId connectionId,
             IFeedJoint sourceFeedJoint, IFeedJoint computeFeedJoint, 
JobSpecification spec,
@@ -91,4 +92,12 @@
         this.computeFeedJoint = computeFeedJoint;
     }
 
+    public void partitionStart() {
+        partitionStarts++;
+    }
+
+    public boolean collectionStarted() {
+        return partitionStarts == collectLocations.size();
+    }
+
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 87e1edb..178d2d5 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -34,6 +34,7 @@
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.feed.management.FeedId;
+import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
@@ -85,6 +86,10 @@
             switch (((SubscribableFeedRuntimeId) 
sourceRuntime.getRuntimeId()).getFeedRuntimeType()) {
                 case INTAKE:
                     handleCompleteConnection();
+                    // Notify CC that Collection started
+                    ctx.sendApplicationMessageToCC(
+                            new 
FeedPartitionStartMessage(connectionId.getFeedId(), 
ctx.getJobletContext().getJobId()),
+                            null);
                     break;
                 case COMPUTE:
                     handlePartialConnection();
@@ -93,7 +98,6 @@
                     throw new IllegalStateException("Invalid source type "
                             + ((SubscribableFeedRuntimeId) 
sourceRuntime.getRuntimeId()).getFeedRuntimeType());
             }
-
             State state = collectRuntime.waitTillCollectionOver();
             if (state.equals(State.FINISHED)) {
                 
feedManager.getFeedConnectionManager().deRegisterFeedRuntime(connectionId,
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 6771010..cd20900 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -36,7 +36,7 @@
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.management.FeedId;
-import org.apache.asterix.external.feed.message.FeedProviderReadyMessage;
+import org.apache.asterix.external.feed.message.FeedPartitionStartMessage;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
@@ -117,7 +117,7 @@
                         adapterRuntimeManager, ctx);
                 
feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
                 // Notify FeedJobNotificationHandler that this provider is 
ready to receive subscription requests.
-                ctx.sendApplicationMessageToCC(new 
FeedProviderReadyMessage(feedId, ctx.getJobletContext().getJobId()),
+                ctx.sendApplicationMessageToCC(new 
FeedPartitionStartMessage(feedId, ctx.getJobletContext().getJobId()),
                         null);
                 feedFrameWriter.open();
             } else {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/792
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib2778b4d7f156c7e06ac9f561a26783c4933a22c
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to