Steven Jacobs has submitted this change and it was merged.

Change subject: Allow BAD jobs to update their specifications to use new indexes
......................................................................


Allow BAD jobs to update their specifications to use new indexes

- storage format changes: new field for Channel body

This changes uses the Asterix upsertDeployedJobSpec to
recompile and update the channel job when new indexes are
created.

Added test case
Moved methods from Asterix DeployedJobService to BADJobService

Change-Id: If0a4d37a5b91063fcb1673dbfd008c140ed54ae6
---
M asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
A asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
D asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
M asterix-bad/src/main/resources/lang-extension/lang.txt
A asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java
A 
asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.3.update.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.query.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.6.ddl.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/drop_index/drop_index.1.ddl.sqlpp
A 
asterix-bad/src/test/resources/runtimets/results/channel/add_index/add_index.1.adm
M 
asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
M 
asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
M 
asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm
M asterix-bad/src/test/resources/runtimets/testsuite.xml
26 files changed, 1,343 insertions(+), 307 deletions(-)

Approvals:
  Jenkins: Verified
  Xikui Wang: Looks good to me, approved



diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java 
b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
index d2d0fa3..d422663 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java
@@ -50,6 +50,7 @@
     String FIELD_NAME_RETURN_TYPE = "ReturnType";
     String FIELD_NAME_DEFINITION = "Definition";
     String FIELD_NAME_LANGUAGE = "Language";
+    String FIELD_NAME_BODY = "Body";
     //To enable new Asterix TxnId for separate deployed job spec invocations
     byte[] TRANSACTION_ID_PARAMETER_NAME = "TxnIdParameter".getBytes();
     int EXECUTOR_TIMEOUT = 20;
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java 
b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
new file mode 100644
index 0000000..e326ce6
--- /dev/null
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad;
+
+import java.io.StringReader;
+import java.time.Instant;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.active.ActiveNotificationHandler;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.lang.BADParserFactory;
+import org.apache.asterix.bad.lang.BADStatementExecutor;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.transactions.ITxnIdFactory;
+import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.Query;
+import org.apache.asterix.lang.common.statement.SetStatement;
+import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IRequestParameters;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.job.DeployedJobSpecId;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+/**
+ * Provides functionality for channel jobs
+ */
+public class BADJobService {
+
+    private static final Logger LOGGER = 
Logger.getLogger(BADJobService.class.getName());
+
+    //pool size one (only running one thread at a time)
+    private static final int POOL_SIZE = 1;
+
+    private static final long millisecondTimeout = 
BADConstants.EXECUTOR_TIMEOUT * 1000;
+
+    //Starts running a deployed job specification periodically with an 
interval of "period" seconds
+    public static ScheduledExecutorService 
startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId,
+            IHyracksClientConnection hcc, long period, Map<byte[], byte[]> 
jobParameters, EntityId entityId,
+            ITxnIdFactory txnIdFactory, DeployedJobSpecEventListener listener) 
{
+        ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(POOL_SIZE);
+        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    if (!runDeployedJobSpecCheckPeriod(distributedId, hcc, 
jobParameters, period, entityId,
+                            txnIdFactory, listener)) {
+                        scheduledExecutorService.shutdown();
+                    }
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, "Job Failed to run for " + 
entityId.getExtensionName() + " "
+                            + entityId.getDataverse() + "." + 
entityId.getEntityName() + ".", e);
+                }
+            }
+        }, period, period, TimeUnit.MILLISECONDS);
+        return scheduledExecutorService;
+    }
+
+    public static boolean runDeployedJobSpecCheckPeriod(DeployedJobSpecId 
distributedId, IHyracksClientConnection hcc,
+            Map<byte[], byte[]> jobParameters, long period, EntityId entityId, 
ITxnIdFactory txnIdFactory,
+            DeployedJobSpecEventListener listener) throws Exception {
+        long executionMilliseconds =
+                runDeployedJobSpec(distributedId, hcc, jobParameters, 
entityId, txnIdFactory, null, listener, null);
+        if (executionMilliseconds > period) {
+            LOGGER.log(Level.SEVERE,
+                    "Periodic job for " + entityId.getExtensionName() + " " + 
entityId.getDataverse() + "."
+                            + entityId.getEntityName() + " was unable to meet 
the required period of " + period
+                            + " milliseconds. Actually took " + 
executionMilliseconds + " execution will shutdown"
+                            + new Date());
+            return false;
+        }
+        return true;
+    }
+
+    public static long runDeployedJobSpec(DeployedJobSpecId distributedId, 
IHyracksClientConnection hcc,
+            Map<byte[], byte[]> jobParameters, EntityId entityId, 
ITxnIdFactory txnIdFactory,
+            ICcApplicationContext appCtx, DeployedJobSpecEventListener 
listener, QueryTranslator statementExecutor)
+            throws Exception {
+        listener.waitWhileAtState(ActivityState.SUSPENDED);
+
+        //Add the Asterix Transaction Id to the map
+        jobParameters.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
+                String.valueOf(txnIdFactory.create().getId()).getBytes());
+
+        long startTime = Instant.now().toEpochMilli();
+        JobId jobId = hcc.startJob(distributedId, jobParameters);
+
+        hcc.waitForCompletion(jobId);
+        long executionMilliseconds = Instant.now().toEpochMilli() - startTime;
+
+        if (listener.getType() == 
DeployedJobSpecEventListener.PrecompiledType.QUERY) {
+            ResultReader resultReader = new 
ResultReader(listener.getResultDataset(), jobId, listener.getResultId());
+
+            ResultUtil.printResults(appCtx, resultReader, 
statementExecutor.getSessionOutput(),
+                    new IStatementExecutor.Stats(), null);
+        }
+
+        LOGGER.log(Level.SEVERE,
+                "Deployed Job execution completed for " + 
entityId.getExtensionName() + " " + entityId.getDataverse()
+                        + "." + entityId.getEntityName() + ". Took " + 
executionMilliseconds + " milliseconds ");
+
+        return executionMilliseconds;
+
+    }
+
+
+    public static long findPeriod(String duration) {
+        //TODO: Allow Repetitive Channels to use YMD durations
+        String hoursMinutesSeconds = "";
+        if (duration.indexOf('T') != -1) {
+            hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 
1);
+        }
+        double seconds = 0;
+        if (hoursMinutesSeconds != "") {
+            int pos = 0;
+            if (hoursMinutesSeconds.indexOf('H') != -1) {
+                Double hours = 
Double.parseDouble(hoursMinutesSeconds.substring(pos, 
hoursMinutesSeconds.indexOf('H')));
+                seconds += (hours * 60 * 60);
+                pos = hoursMinutesSeconds.indexOf('H') + 1;
+            }
+            if (hoursMinutesSeconds.indexOf('M') != -1) {
+                Double minutes =
+                        Double.parseDouble(hoursMinutesSeconds.substring(pos, 
hoursMinutesSeconds.indexOf('M')));
+                seconds += (minutes * 60);
+                pos = hoursMinutesSeconds.indexOf('M') + 1;
+            }
+            if (hoursMinutesSeconds.indexOf('S') != -1) {
+                Double s = 
Double.parseDouble(hoursMinutesSeconds.substring(pos, 
hoursMinutesSeconds.indexOf('S')));
+                seconds += (s);
+            }
+        }
+        return (long) (seconds * 1000);
+    }
+
+    public static JobSpecification compilePushChannel(IStatementExecutor 
statementExecutor,
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc, 
Query q) throws Exception {
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        JobSpecification jobSpec = null;
+        try {
+            jobSpec = ((QueryTranslator) 
statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+        } catch (Exception e) {
+            LOGGER.log(Level.INFO, e.getMessage(), e);
+            if (bActiveTxn) {
+                ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
+            }
+            throw e;
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+        return jobSpec;
+    }
+
+    public static void redeployJobSpec(EntityId entityId, String 
queryBodyString, MetadataProvider metadataProvider,
+            BADStatementExecutor badStatementExecutor, 
IHyracksClientConnection hcc,
+            IRequestParameters requestParameters) throws Exception {
+
+        ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+        DeployedJobSpecEventListener listener =
+                (DeployedJobSpecEventListener) 
activeEventHandler.getListener(entityId);
+        if (listener == null) {
+            LOGGER.severe("Tried to redeploy the job for " + entityId + " but 
no listener exists.");
+            return;
+        }
+
+        BADParserFactory factory = new BADParserFactory();
+        List<Statement> fStatements = factory.createParser(new 
StringReader(queryBodyString)).parse();
+        JobSpecification jobSpec = null;
+        if 
(listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL)
+                || 
listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.CHANNEL))
 {
+            //Channels
+            SetStatement ss = (SetStatement) fStatements.get(0);
+            metadataProvider.getConfig().put(ss.getPropName(), 
ss.getPropValue());
+            if 
(listener.getType().equals(DeployedJobSpecEventListener.PrecompiledType.PUSH_CHANNEL))
 {
+                jobSpec = compilePushChannel(badStatementExecutor, 
metadataProvider, hcc, (Query) fStatements.get(1));
+            } else {
+                jobSpec = 
badStatementExecutor.handleInsertUpsertStatement(metadataProvider, 
fStatements.get(1), hcc,
+                        null, null, null, null, true, null);
+            }
+        } else {
+            //Procedures
+            metadataProvider.setResultSetId(listener.getResultId());
+            final IStatementExecutor.ResultDelivery resultDelivery =
+                    requestParameters.getResultProperties().getDelivery();
+            final IHyracksDataset hdc = requestParameters.getHyracksDataset();
+            final IStatementExecutor.Stats stats = 
requestParameters.getStats();
+            boolean resultsAsync = resultDelivery == 
IStatementExecutor.ResultDelivery.ASYNC
+                    || resultDelivery == 
IStatementExecutor.ResultDelivery.DEFERRED;
+            metadataProvider.setResultAsyncMode(resultsAsync);
+            metadataProvider.setMaxResultReads(1);
+
+            jobSpec = compileProcedureJob(badStatementExecutor, 
metadataProvider, hcc, hdc, stats, fStatements.get(1));
+
+        }
+        hcc.upsertDeployedJobSpec(listener.getDeployedJobSpecId(), jobSpec);
+
+        listener.resume();
+
+    }
+
+    public static JobSpecification compileQueryJob(IStatementExecutor 
statementExecutor,
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc, 
Query q) throws Exception {
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        boolean bActiveTxn = true;
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        JobSpecification jobSpec = null;
+        try {
+            jobSpec = statementExecutor.rewriteCompileQuery(hcc, 
metadataProvider, q, null);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+            bActiveTxn = false;
+        } catch (Exception e) {
+            ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
+            throw e;
+        }
+        return jobSpec;
+    }
+
+    private static JobSpecification compileProcedureJob(IStatementExecutor 
statementExecutor,
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc, 
IHyracksDataset hdc,
+            IStatementExecutor.Stats stats, Statement procedureStatement) 
throws Exception {
+        if (procedureStatement.getKind() == Statement.Kind.INSERT) {
+            return ((QueryTranslator) 
statementExecutor).handleInsertUpsertStatement(metadataProvider,
+                    procedureStatement, hcc, hdc, 
IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null);
+        } else if (procedureStatement.getKind() == Statement.Kind.QUERY) {
+            return compileQueryJob(statementExecutor, metadataProvider, hcc, 
(Query) procedureStatement);
+        } else {
+            SqlppDeleteRewriteVisitor visitor = new 
SqlppDeleteRewriteVisitor();
+            procedureStatement.accept(visitor, null);
+            return ((QueryTranslator) 
statementExecutor).handleDeleteStatement(metadataProvider, procedureStatement,
+                    hcc, true);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "BADJobService";
+    }
+
+}
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java 
b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
deleted file mode 100644
index 3df9a76..0000000
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad;
-
-import java.util.logging.Logger;
-
-
-/**
- * Provides functionality for channel jobs
- */
-public class ChannelJobService {
-
-    private static final Logger LOGGER = 
Logger.getLogger(ChannelJobService.class.getName());
-
-
-    public static long findPeriod(String duration) {
-        //TODO: Allow Repetitive Channels to use YMD durations
-        String hoursMinutesSeconds = "";
-        if (duration.indexOf('T') != -1) {
-            hoursMinutesSeconds = duration.substring(duration.indexOf('T') + 
1);
-        }
-        double seconds = 0;
-        if (hoursMinutesSeconds != "") {
-            int pos = 0;
-            if (hoursMinutesSeconds.indexOf('H') != -1) {
-                Double hours = 
Double.parseDouble(hoursMinutesSeconds.substring(pos, 
hoursMinutesSeconds.indexOf('H')));
-                seconds += (hours * 60 * 60);
-                pos = hoursMinutesSeconds.indexOf('H') + 1;
-            }
-            if (hoursMinutesSeconds.indexOf('M') != -1) {
-                Double minutes =
-                        Double.parseDouble(hoursMinutesSeconds.substring(pos, 
hoursMinutesSeconds.indexOf('M')));
-                seconds += (minutes * 60);
-                pos = hoursMinutesSeconds.indexOf('M') + 1;
-            }
-            if (hoursMinutesSeconds.indexOf('S') != -1) {
-                Double s = 
Double.parseDouble(hoursMinutesSeconds.substring(pos, 
hoursMinutesSeconds.indexOf('S')));
-                seconds += (s);
-            }
-        }
-        return (long) (seconds * 1000);
-    }
-
-
-    @Override
-    public String toString() {
-        return "ChannelJobService";
-    }
-
-}
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index 8c7143f..4ab7530 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -18,22 +18,27 @@
  */
 package org.apache.asterix.bad.lang;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.app.translator.RequestParameters;
+import org.apache.asterix.bad.BADJobService;
 import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
 import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
 import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
 import org.apache.asterix.bad.metadata.Broker;
 import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.compiler.provider.ILangCompilationProvider;
 import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.statement.CreateIndexStatement;
 import org.apache.asterix.lang.common.statement.DataverseDropStatement;
 import org.apache.asterix.lang.common.statement.DropDatasetStatement;
 import org.apache.asterix.lang.common.statement.FunctionDropStatement;
@@ -42,9 +47,12 @@
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.SessionOutput;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 
 public class BADStatementExecutor extends QueryTranslator {
@@ -56,16 +64,21 @@
 
     //TODO: Most of this file could go away if we had metadata dependencies
 
-    private void checkIfDatasetIsInUse(MetadataTransactionContext mdTxnCtx, 
String dataverse, String dataset)
-            throws CompilationException, AlgebricksException {
+    private Pair<List<Channel>, List<Procedure>> 
checkIfDatasetIsInUse(MetadataTransactionContext mdTxnCtx,
+            String dataverse, String dataset, boolean checkAll) throws 
AlgebricksException {
+        List<Channel> channelsUsingDataset = new ArrayList<>();
+        List<Procedure> proceduresUsingDataset = new ArrayList<>();
         List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
         for (Channel channel : channels) {
             List<List<List<String>>> dependencies = channel.getDependencies();
             List<List<String>> datasetDependencies = dependencies.get(0);
             for (List<String> dependency : datasetDependencies) {
                 if (dependency.get(0).equals(dataverse) && 
dependency.get(1).equals(dataset)) {
-                    throw new CompilationException("Cannot alter dataset " + 
dataverse + "." + dataset + ". "
-                            + channel.getChannelId() + " depends on it!");
+                    channelsUsingDataset.add(channel);
+                    if (!checkAll) {
+                        return new Pair<>(channelsUsingDataset, 
proceduresUsingDataset);
+                    }
+
                 }
             }
 
@@ -76,11 +89,81 @@
             List<List<String>> datasetDependencies = dependencies.get(0);
             for (List<String> dependency : datasetDependencies) {
                 if (dependency.get(0).equals(dataverse) && 
dependency.get(1).equals(dataset)) {
-                    throw new CompilationException("Cannot alter dataset " + 
dataverse + "." + dataset + ". "
-                            + procedure.getEntityId() + " depends on it!");
+                    proceduresUsingDataset.add(procedure);
+                    if (!checkAll) {
+                        return new Pair<>(channelsUsingDataset, 
proceduresUsingDataset);
+                    }
                 }
             }
 
+        }
+        return new Pair<>(channelsUsingDataset, proceduresUsingDataset);
+    }
+
+    private Pair<List<Channel>, List<Procedure>> 
checkIfFunctionIsInUse(MetadataTransactionContext mdTxnCtx,
+            String dvId, String function, String arity, boolean checkAll)
+            throws CompilationException, AlgebricksException {
+        List<Channel> channelsUsingFunction = new ArrayList<>();
+        List<Procedure> proceduresUsingFunction = new ArrayList<>();
+
+        List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
+        for (Channel channel : channels) {
+            List<List<List<String>>> dependencies = channel.getDependencies();
+            List<List<String>> datasetDependencies = dependencies.get(1);
+            for (List<String> dependency : datasetDependencies) {
+                if (dependency.get(0).equals(dvId) && 
dependency.get(1).equals(function)
+                        && dependency.get(2).equals(arity)) {
+                    channelsUsingFunction.add(channel);
+                    if (!checkAll) {
+                        return new Pair<>(channelsUsingFunction, 
proceduresUsingFunction);
+                    }
+                }
+            }
+
+        }
+        List<Procedure> procedures = 
BADLangExtension.getAllProcedures(mdTxnCtx);
+        for (Procedure procedure : procedures) {
+            List<List<List<String>>> dependencies = 
procedure.getDependencies();
+            List<List<String>> datasetDependencies = dependencies.get(1);
+            for (List<String> dependency : datasetDependencies) {
+                if (dependency.get(0).equals(dvId) && 
dependency.get(1).equals(function)
+                        && dependency.get(2).equals(arity)) {
+                    proceduresUsingFunction.add(procedure);
+                    if (!checkAll) {
+                        return new Pair<>(channelsUsingFunction, 
proceduresUsingFunction);
+                    }
+                }
+            }
+
+        }
+        return new Pair<>(channelsUsingFunction, proceduresUsingFunction);
+    }
+
+    private void throwErrorIfDatasetUsed(MetadataTransactionContext mdTxnCtx, 
String dataverse, String dataset)
+            throws CompilationException, AlgebricksException {
+        Pair<List<Channel>, List<Procedure>> dependents = 
checkIfDatasetIsInUse(mdTxnCtx, dataverse, dataset, false);
+        if (dependents.first.size() > 0) {
+            throw new CompilationException("Cannot alter dataset " + dataverse 
+ "." + dataset + ". "
+                    + dependents.first.get(0).getChannelId() + " depends on 
it!");
+        }
+        if (dependents.second.size() > 0) {
+            throw new CompilationException("Cannot alter dataset " + dataverse 
+ "." + dataset + ". "
+                    + dependents.second.get(0).getEntityId() + " depends on 
it!");
+        }
+    }
+
+    private void throwErrorIfFunctionUsed(MetadataTransactionContext mdTxnCtx, 
String dataverse, String function,
+            String arity, FunctionSignature sig) throws CompilationException, 
AlgebricksException {
+        Pair<List<Channel>, List<Procedure>> dependents =
+                checkIfFunctionIsInUse(mdTxnCtx, dataverse, function, arity, 
false);
+        String errorStart = sig != null ? "Cannot drop function " + sig + "." 
: "Cannot drop index.";
+        if (dependents.first.size() > 0) {
+            throw new CompilationException(
+                    errorStart + " " + dependents.first.get(0).getChannelId() 
+ " depends on it!");
+        }
+        if (dependents.second.size() > 0) {
+            throw new CompilationException(
+                    errorStart + " " + dependents.second.get(0).getEntityId() 
+ " depends on it!");
         }
     }
 
@@ -92,10 +175,85 @@
         String dvId = getActiveDataverse(((DropDatasetStatement) 
stmt).getDataverseName());
         Identifier dsId = ((DropDatasetStatement) stmt).getDatasetName();
 
-        checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId.getValue());
+        throwErrorIfDatasetUsed(mdTxnCtx, dvId, dsId.getValue());
 
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         super.handleDatasetDropStatement(metadataProvider, stmt, hcc, 
requestParameters);
+    }
+
+    @Override
+    public void handleCreateIndexStatement(MetadataProvider metadataProvider, 
Statement stmt,
+            IHyracksClientConnection hcc, IRequestParameters 
requestParameters) throws Exception {
+
+        //TODO: Check whether a delete or insert procedure using the index. If 
so, we will need to
+        // disallow the procedure until after the newly distributed version is 
ready
+
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        //Allow channels to use the new index
+        String dvId = getActiveDataverse(((CreateIndexStatement) 
stmt).getDataverseName());
+        String dsId = ((CreateIndexStatement) 
stmt).getDatasetName().getValue();
+
+        Pair<List<Channel>, List<Procedure>> usages = 
checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId, true);
+
+        List<Dataverse> dataverseList = 
MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+        for (Dataverse dv : dataverseList) {
+            List<Function> functions = 
MetadataManager.INSTANCE.getFunctions(mdTxnCtx, dv.getDataverseName());
+            for (Function function : functions) {
+                for (List<String> datasetDependency : 
function.getDependencies().get(0)) {
+                    if (datasetDependency.get(0).equals(dvId) && 
datasetDependency.get(1).equals(dsId)) {
+                        Pair<List<Channel>, List<Procedure>> functionUsages =
+                                checkIfFunctionIsInUse(mdTxnCtx, 
function.getDataverseName(), function.getName(),
+                                        Integer.toString(function.getArity()), 
true);
+                        for (Channel channel : functionUsages.first) {
+                            if (!usages.first.contains(channel)) {
+                                usages.first.add(channel);
+                            }
+                        }
+                        for (Procedure procedure : functionUsages.second) {
+                            if (!usages.second.contains(procedure)) {
+                                usages.second.add(procedure);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        ActiveNotificationHandler activeEventHandler =
+                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+
+        for (Channel channel : usages.first) {
+            DeployedJobSpecEventListener listener =
+                    (DeployedJobSpecEventListener) 
activeEventHandler.getListener(channel.getChannelId());
+            listener.suspend();
+        }
+        for (Procedure procedure : usages.second) {
+            DeployedJobSpecEventListener listener =
+                    (DeployedJobSpecEventListener) 
activeEventHandler.getListener(procedure.getEntityId());
+            listener.suspend();
+        }
+
+        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        metadataProvider.getLocks().unlock();
+
+        metadataProvider = new MetadataProvider(appCtx, activeDataverse);
+        super.handleCreateIndexStatement(metadataProvider, stmt, hcc, 
requestParameters);
+
+        for (Channel channel : usages.first) {
+            metadataProvider = new MetadataProvider(appCtx, activeDataverse);
+            BADJobService.redeployJobSpec(channel.getChannelId(), 
channel.getChannelBody(), metadataProvider, this, hcc,
+                    requestParameters);
+            metadataProvider.getLocks().unlock();
+        }
+        for (Procedure procedure : usages.second) {
+            metadataProvider = new MetadataProvider(appCtx, activeDataverse);
+            BADJobService.redeployJobSpec(procedure.getEntityId(), 
procedure.getBody(), metadataProvider, this, hcc,
+                    requestParameters);
+            metadataProvider.getLocks().unlock();
+        }
+
+
     }
 
     @Override
@@ -106,7 +264,20 @@
         String dvId = getActiveDataverse(((IndexDropStatement) 
stmt).getDataverseName());
         Identifier dsId = ((IndexDropStatement) stmt).getDatasetName();
 
-        checkIfDatasetIsInUse(mdTxnCtx, dvId, dsId.getValue());
+        throwErrorIfDatasetUsed(mdTxnCtx, dvId, dsId.getValue());
+
+        List<Dataverse> dataverseList = 
MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+        for (Dataverse dv : dataverseList) {
+            List<Function> functions = 
MetadataManager.INSTANCE.getFunctions(mdTxnCtx, dv.getDataverseName());
+            for (Function function : functions) {
+                for (List<String> datasetDependency : 
function.getDependencies().get(0)) {
+                    if (datasetDependency.get(0).equals(dvId) && 
datasetDependency.get(1).equals(dsId.getValue())) {
+                        throwErrorIfFunctionUsed(mdTxnCtx, 
function.getDataverseName(), function.getName(),
+                                Integer.toString(function.getArity()), null);
+                    }
+                }
+            }
+        }
 
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         super.handleIndexDropStatement(metadataProvider, stmt, hcc, 
requestParameters);
@@ -122,32 +293,7 @@
         String function = sig.getName();
         String arity = Integer.toString(sig.getArity());
 
-        List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx);
-        for (Channel channel : channels) {
-            List<List<List<String>>> dependencies = channel.getDependencies();
-            List<List<String>> datasetDependencies = dependencies.get(1);
-            for (List<String> dependency : datasetDependencies) {
-                if (dependency.get(0).equals(dvId) && 
dependency.get(1).equals(function)
-                        && dependency.get(2).equals(arity)) {
-                    throw new CompilationException(
-                            "Cannot drop function " + sig + ". " + 
channel.getChannelId() + " depends on it!");
-                }
-            }
-
-        }
-        List<Procedure> procedures = 
BADLangExtension.getAllProcedures(mdTxnCtx);
-        for (Procedure procedure : procedures) {
-            List<List<List<String>>> dependencies = 
procedure.getDependencies();
-            List<List<String>> datasetDependencies = dependencies.get(1);
-            for (List<String> dependency : datasetDependencies) {
-                if (dependency.get(0).equals(dvId) && 
dependency.get(1).equals(function)
-                        && dependency.get(2).equals(arity)) {
-                    throw new CompilationException(
-                            "Cannot drop function " + sig + ". " + 
procedure.getEntityId() + " depends on it!");
-                }
-            }
-
-        }
+        throwErrorIfFunctionUsed(mdTxnCtx, dvId, function, arity, sig);
 
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         super.handleFunctionDropStatement(metadataProvider, stmt);
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 87ac320..22767f2 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -28,13 +28,12 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.active.DeployedJobService;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.ExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.BADJobService;
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.lang.BADParserFactory;
 import org.apache.asterix.bad.metadata.Channel;
@@ -57,7 +56,6 @@
 import org.apache.asterix.lang.common.statement.CreateIndexStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
-import org.apache.asterix.lang.common.statement.InsertStatement;
 import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.statement.SetStatement;
@@ -89,7 +87,7 @@
     private final CallExpr period;
     private Identifier dataverseName;
     private String duration;
-    private InsertStatement channelResultsInsertQuery;
+    private String body;
     private String subscriptionsTableName;
     private String resultsTableName;
     private String dataverse;
@@ -131,10 +129,6 @@
 
     public Expression getPeriod() {
         return period;
-    }
-
-    public InsertStatement getChannelResultsInsertQuery() {
-        return channelResultsInsertQuery;
     }
 
     @Override
@@ -221,28 +215,6 @@
 
     }
 
-    private JobSpecification compilePushChannel(IStatementExecutor 
statementExecutor, MetadataProvider metadataProvider,
-            IHyracksClientConnection hcc, Query q) throws Exception {
-        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        JobSpecification jobSpec = null;
-        try {
-            jobSpec = ((QueryTranslator) 
statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-        } catch (Exception e) {
-            LOGGER.log(Level.INFO, e.getMessage(), e);
-            if (bActiveTxn) {
-                ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
-            }
-            throw e;
-        } finally {
-            metadataProvider.getLocks().unlock();
-        }
-        return jobSpec;
-    }
-
     private JobSpecification createChannelJob(IStatementExecutor 
statementExecutor, MetadataProvider metadataProvider,
             IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) 
throws Exception {
         StringBuilder builder = new StringBuilder();
@@ -271,13 +243,15 @@
             builder.append(" returning a");
         }
         builder.append(";");
+        body = builder.toString();
         BADParserFactory factory = new BADParserFactory();
         List<Statement> fStatements = factory.createParser(new 
StringReader(builder.toString())).parse();
 
         SetStatement ss = (SetStatement) fStatements.get(0);
         metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
         if (push) {
-            return compilePushChannel(statementExecutor, metadataProvider, 
hcc, (Query) fStatements.get(1));
+            return BADJobService.compilePushChannel(statementExecutor, 
metadataProvider, hcc,
+                    (Query) fStatements.get(1));
         }
         return ((QueryTranslator) 
statementExecutor).handleInsertUpsertStatement(metadataProvider, 
fStatements.get(1),
                 hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null);
@@ -286,9 +260,10 @@
     private void setupExecutorJob(EntityId entityId, JobSpecification 
channeljobSpec, IHyracksClientConnection hcc,
             DeployedJobSpecEventListener listener, ITxnIdFactory txnIdFactory) 
throws Exception {
         if (channeljobSpec != null) {
+            
channeljobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME,
 entityId);
             DeployedJobSpecId destributedId = 
hcc.deployJobSpec(channeljobSpec);
-            ScheduledExecutorService ses = 
DeployedJobService.startRepetitiveDeployedJobSpec(destributedId, hcc,
-                    ChannelJobService.findPeriod(duration), new HashMap<>(), 
entityId, txnIdFactory);
+            ScheduledExecutorService ses = 
BADJobService.startRepetitiveDeployedJobSpec(destributedId, hcc,
+                    BADJobService.findPeriod(duration), new HashMap<>(), 
entityId, txnIdFactory, listener);
             listener.storeDistributedInfo(destributedId, ses, null, null);
         }
 
@@ -354,14 +329,15 @@
 
             // Now we subscribe
             if (listener == null) {
-                listener = new DeployedJobSpecEventListener(appCtx, entityId, 
PrecompiledType.CHANNEL, null,
+                listener = new DeployedJobSpecEventListener(appCtx, entityId,
+                        push ? PrecompiledType.PUSH_CHANNEL : 
PrecompiledType.CHANNEL, null,
                         "BadListener");
                 activeEventHandler.registerListener(listener);
             }
 
             setupExecutorJob(entityId, channeljobSpec, hcc, listener, 
metadataProvider.getTxnIdFactory());
             channel = new Channel(dataverse, channelName.getValue(), 
subscriptionsTableName, resultsTableName, function,
-                    duration, null);
+                    duration, null, body);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index f3561a4..03db7bc 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -20,18 +20,23 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
+import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.ExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.BADJobService;
 import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.lang.BADParserFactory;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
 import 
org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
@@ -42,7 +47,6 @@
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
-import org.apache.asterix.lang.common.clause.LetClause;
 import org.apache.asterix.lang.common.expression.CallExpr;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.expression.VariableExpr;
@@ -54,7 +58,6 @@
 import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
-import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
 import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
 import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
 import org.apache.asterix.metadata.MetadataManager;
@@ -62,14 +65,12 @@
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.om.base.temporal.ADurationParserFactory;
-import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.dataset.ResultSetId;
@@ -84,7 +85,7 @@
 
     private final FunctionSignature signature;
     private final String procedureBody;
-    private final Statement procedureBodyStatement;
+    private Statement procedureBodyStatement;
     private final List<String> paramList;
     private final List<VariableExpr> varList;
     private final CallExpr period;
@@ -92,20 +93,30 @@
     private List<List<List<String>>> dependencies;
 
     public CreateProcedureStatement(FunctionSignature signature, 
List<VarIdentifier> parameterList,
-            List<Integer> paramIds, String functionBody, Statement 
procedureBodyStatement, Expression period) {
+            List<Integer> paramIds, String functionBody, Expression period) {
         this.signature = signature;
-        this.procedureBody = functionBody;
-        this.procedureBodyStatement = procedureBodyStatement;
         this.paramList = new ArrayList<>();
         this.varList = new ArrayList<>();
         for (int i = 0; i < parameterList.size(); i++) {
-            this.paramList.add(parameterList.get(i).getValue());
-            this.varList.add(new VariableExpr(new 
VarIdentifier(parameterList.get(i).toString(), paramIds.get(i))));
+            this.paramList.add(parameterList.get(i).getValue().substring(1));
+            this.varList.add(
+                    new VariableExpr(new 
VarIdentifier(parameterList.get(i).getValue().substring(1), paramIds.get(i))));
         }
+        procedureBody = rewriteJobParams(functionBody);
         this.period = (CallExpr) period;
         this.dependencies = new ArrayList<>();
         this.dependencies.add(new ArrayList<>());
         this.dependencies.add(new ArrayList<>());
+    }
+
+    private String rewriteJobParams(String body) {
+        String newBody = body;
+        for (VariableExpr var : varList) {
+            Pattern variableReference = Pattern.compile("([^\\w\\d])" + 
var.getVar() + "([^\\w\\d]|$)");
+            Matcher matcher = variableReference.matcher(newBody);
+            newBody = matcher.replaceAll("$1get_job_param(\"" + var.getVar() + 
"\")$2");
+        }
+        return "use " + signature.getNamespace() + ";\n" + newBody + ";";
     }
 
     public String getProcedureBody() {
@@ -142,7 +153,14 @@
         return null;
     }
 
-    private void initialize() throws MetadataException, HyracksDataException {
+    private void initialize() throws CompilationException, 
HyracksDataException {
+        BADParserFactory factory = new BADParserFactory();
+        List<Statement> fStatements = factory.createParser(new 
StringReader(procedureBody)).parse();
+        if (fStatements.size() != 2) {
+            //TODO: Add a test for this error
+            throw new CompilationException("Procedure can only execute a 
single statement");
+        }
+        procedureBodyStatement = fStatements.get(1);
         if (period == null) {
             return;
         }
@@ -155,40 +173,6 @@
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream outputStream = new DataOutputStream(bos);
         durationParser.parse(duration.toCharArray(), 0, 
duration.toCharArray().length, outputStream);
-    }
-
-    private JobSpecification compileQueryJob(IStatementExecutor 
statementExecutor, MetadataProvider metadataProvider,
-            IHyracksClientConnection hcc, Query q) throws Exception {
-        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
-        boolean bActiveTxn = true;
-        metadataProvider.setMetadataTxnContext(mdTxnCtx);
-        JobSpecification jobSpec = null;
-        try {
-            jobSpec = ((QueryTranslator) 
statementExecutor).rewriteCompileQuery(hcc, metadataProvider, q, null);
-            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-            bActiveTxn = false;
-        } catch (Exception e) {
-            LOGGER.log(Level.INFO, e.getMessage(), e);
-            if (bActiveTxn) {
-                ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx);
-            }
-            throw e;
-        }
-        return jobSpec;
-    }
-
-    private void addLets(SelectExpression s) {
-        FunctionIdentifier function = BuiltinFunctions.GET_JOB_PARAMETER;
-        FunctionSignature sig =
-                new FunctionSignature(function.getNamespace(), 
function.getName(), function.getArity());
-        for (VariableExpr var : varList) {
-            List<Expression> strListForCall = new ArrayList<>();
-            LiteralExpr l = new LiteralExpr(new 
StringLiteral(var.getVar().getValue()));
-            strListForCall.add(l);
-            Expression con = new CallExpr(sig, strListForCall);
-            LetClause let = new LetClause(var, con);
-            s.getLetList().add(let);
-        }
     }
 
     private Pair<JobSpecification, PrecompiledType> 
createProcedureJob(IStatementExecutor statementExecutor,
@@ -207,28 +191,23 @@
                             getProcedureBodyStatement(), hcc, hdc, 
ResultDelivery.ASYNC, null, stats, true, null),
                     PrecompiledType.INSERT);
         } else if (getProcedureBodyStatement().getKind() == 
Statement.Kind.QUERY) {
-            Query s = (Query) getProcedureBodyStatement();
-            addLets((SelectExpression) s.getBody());
             SqlppRewriterFactory fact = new SqlppRewriterFactory();
             
dependencies.get(1).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(),
                     ((Query) getProcedureBodyStatement()).getBody(), 
metadataProvider).get(1));
-            Pair<JobSpecification, PrecompiledType> pair = new Pair<>(
-                    compileQueryJob(statementExecutor, metadataProvider, hcc, 
(Query) getProcedureBodyStatement()),
+            Pair<JobSpecification, PrecompiledType> pair = new 
Pair<>(BADJobService.compileQueryJob(statementExecutor,
+                    metadataProvider, hcc, (Query) 
getProcedureBodyStatement()),
                     PrecompiledType.QUERY);
             
dependencies.get(0).addAll(FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(),
                     ((Query) getProcedureBodyStatement()).getBody(), 
metadataProvider).get(0));
-            metadataProvider.getLocks().unlock();
             return pair;
         } else if (getProcedureBodyStatement().getKind() == 
Statement.Kind.DELETE) {
             SqlppDeleteRewriteVisitor visitor = new 
SqlppDeleteRewriteVisitor();
             getProcedureBodyStatement().accept(visitor, null);
             DeleteStatement delete = (DeleteStatement) 
getProcedureBodyStatement();
-            addLets((SelectExpression) delete.getQuery().getBody());
 
             SqlppRewriterFactory fact = new SqlppRewriterFactory();
             dependencies = 
FunctionUtil.getFunctionDependencies(fact.createQueryRewriter(), 
delete.getQuery().getBody(),
                     metadataProvider);
-
             Pair<JobSpecification, PrecompiledType> pair =
                     new Pair<>(((QueryTranslator) 
statementExecutor).handleDeleteStatement(metadataProvider,
                     getProcedureBodyStatement(), hcc, true), 
PrecompiledType.DELETE);
@@ -276,24 +255,16 @@
             if (alreadyActive) {
                 throw new AsterixException("Procedure " + signature.getName() 
+ " is already running");
             }
-            MetadataProvider tempMdProvider = new 
MetadataProvider(metadataProvider.getApplicationContext(),
-                    metadataProvider.getDefaultDataverse());
-            tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
             metadataProvider.setResultSetId(new ResultSetId(resultSetId++));
             final ResultDelivery resultDelivery = 
requestParameters.getResultProperties().getDelivery();
             final IHyracksDataset hdc = requestParameters.getHyracksDataset();
             final Stats stats = requestParameters.getStats();
             boolean resultsAsync = resultDelivery == ResultDelivery.ASYNC || 
resultDelivery == ResultDelivery.DEFERRED;
             metadataProvider.setResultAsyncMode(resultsAsync);
-            tempMdProvider.setResultSetId(metadataProvider.getResultSetId());
-            tempMdProvider.setResultAsyncMode(resultsAsync);
-            
tempMdProvider.setWriterFactory(metadataProvider.getWriterFactory());
-            
tempMdProvider.setResultSerializerFactoryProvider(metadataProvider.getResultSerializerFactoryProvider());
-            tempMdProvider.setOutputFile(metadataProvider.getOutputFile());
-            
tempMdProvider.setMaxResultReads(requestParameters.getResultProperties().getMaxReads());
+            metadataProvider.setMaxResultReads(1);
             //Create Procedure Internal Job
             Pair<JobSpecification, PrecompiledType> procedureJobSpec =
-                    createProcedureJob(statementExecutor, tempMdProvider, hcc, 
hdc, stats);
+                    createProcedureJob(statementExecutor, metadataProvider, 
hcc, hdc, stats);
 
             // Now we subscribe
             if (listener == null) {
@@ -301,7 +272,8 @@
                         "BadListener");
                 activeEventHandler.registerListener(listener);
             }
-            setupDeployedJobSpec(entityId, procedureJobSpec.first, hcc, 
listener, tempMdProvider.getResultSetId(), hdc,
+            setupDeployedJobSpec(entityId, procedureJobSpec.first, hcc, 
listener, metadataProvider.getResultSetId(),
+                    hdc,
                     stats);
 
             procedure = new Procedure(dataverse, signature.getName(), 
signature.getArity(), getParamList(),
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 635f2ce..025b9e6 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -24,18 +24,14 @@
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 
-import org.apache.asterix.active.DeployedJobService;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.ExtensionStatement;
-import org.apache.asterix.api.http.server.ResultUtil;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
-import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.BADJobService;
 import org.apache.asterix.bad.lang.BADLangExtension;
 import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
-import 
org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -54,12 +50,10 @@
 import org.apache.asterix.translator.ConstantHelper;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public class ExecuteProcedureStatement extends ExtensionStatement {
@@ -111,10 +105,9 @@
         boolean txnActive = false;
         EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD, 
dataverse, procedureName);
         DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) 
activeEventHandler.getListener(entityId);
-        Procedure procedure = null;
+        Procedure procedure;
 
         MetadataTransactionContext mdTxnCtx = null;
-        JobId jobId;
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             txnActive = true;
@@ -125,31 +118,14 @@
             Map<byte[], byte[]> contextRuntimeVarMap = 
createParameterMap(procedure);
             DeployedJobSpecId deployedJobSpecId = 
listener.getDeployedJobSpecId();
             if (procedure.getDuration().equals("")) {
+                BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, 
contextRuntimeVarMap, entityId,
+                        metadataProvider.getTxnIdFactory(), appCtx, listener, 
(QueryTranslator) statementExecutor);
 
-                //Add the Asterix Transaction Id to the map
-                long newTxId = 
metadataProvider.getTxnIdFactory().create().getId();
-                
contextRuntimeVarMap.put(BADConstants.TRANSACTION_ID_PARAMETER_NAME,
-                        String.valueOf(newTxId).getBytes());
-                jobId = hcc.startJob(deployedJobSpecId, contextRuntimeVarMap);
-
-                boolean wait = 
Boolean.parseBoolean(metadataProvider.getConfig().get(
-                        ExecuteProcedureStatement.WAIT_FOR_COMPLETION));
-                if (wait || listener.getType() == PrecompiledType.QUERY) {
-                    hcc.waitForCompletion(jobId);
-                }
-
-                if (listener.getType() == PrecompiledType.QUERY) {
-                    ResultReader resultReader =
-                            new ResultReader(listener.getResultDataset(), 
jobId, listener.getResultId());
-
-                    ResultUtil.printResults(appCtx, resultReader,
-                            ((QueryTranslator) 
statementExecutor).getSessionOutput(), new Stats(), null);
-                }
 
             } else {
-                ScheduledExecutorService ses = 
DeployedJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
-                        ChannelJobService.findPeriod(procedure.getDuration()), 
contextRuntimeVarMap, entityId,
-                        metadataProvider.getTxnIdFactory());
+                ScheduledExecutorService ses = 
BADJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc,
+                        BADJobService.findPeriod(procedure.getDuration()), 
contextRuntimeVarMap, entityId,
+                        metadataProvider.getTxnIdFactory(), listener);
                 listener.storeDistributedInfo(deployedJobSpecId, ses, 
listener.getResultDataset(),
                         listener.getResultId());
             }
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
index 526e091..1e5e627 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -50,18 +50,20 @@
     public static final int CHANNEL_ARECORD_FUNCTION_FIELD_INDEX = 4;
     public static final int CHANNEL_ARECORD_DURATION_FIELD_INDEX = 5;
     public static final int CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX = 6;
+    public static final int CHANNEL_ARECORD_BODY_FIELD_INDEX = 7;
     public static final ARecordType CHANNEL_RECORDTYPE = 
MetadataRecordTypes.createRecordType(
             // RecordTypeName
             BADConstants.RECORD_TYPENAME_CHANNEL,
             // FieldNames
             new String[] { BADConstants.DataverseName, 
BADConstants.ChannelName, BADConstants.SubscriptionsDatasetName,
                     BADConstants.ResultsDatasetName, BADConstants.Function, 
BADConstants.Duration,
-                    BADConstants.FIELD_NAME_DEPENDENCIES },
+                    BADConstants.FIELD_NAME_DEPENDENCIES, 
BADConstants.FIELD_NAME_BODY },
             // FieldTypes
             new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, 
BuiltinType.ASTRING, BuiltinType.ASTRING,
                     new AOrderedListType(BuiltinType.ASTRING, null), 
BuiltinType.ASTRING,
                     new AOrderedListType(new AOrderedListType(new 
AOrderedListType(BuiltinType.ASTRING, null), null),
-                            null) },
+                            null),
+                    BuiltinType.ASTRING },
             //IsOpen?
             true);
     //------------------------------------------ Broker 
----------------------------------------//
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
index 5f7dad0..ed9346c 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Channel.java
@@ -37,6 +37,7 @@
     private final String subscriptionsDatasetName;
     private final String resultsDatasetName;
     private final String duration;
+    private final String channelBody;
     private final FunctionSignature function;
     private final List<String> functionAsPath;
     /*
@@ -49,12 +50,13 @@
     private final List<List<List<String>>> dependencies;
 
     public Channel(String dataverseName, String channelName, String 
subscriptionsDataset, String resultsDataset,
-            FunctionSignature function, String duration, 
List<List<List<String>>> dependencies) {
+            FunctionSignature function, String duration, 
List<List<List<String>>> dependencies, String channelBody) {
         this.channelId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, 
dataverseName, channelName);
         this.function = function;
         this.duration = duration;
         this.resultsDatasetName = resultsDataset;
         this.subscriptionsDatasetName = subscriptionsDataset;
+        this.channelBody = channelBody;
         if (this.function.getNamespace() == null) {
             this.function.setNamespace(dataverseName);
         }
@@ -94,6 +96,10 @@
         return duration;
     }
 
+    public String getChannelBody() {
+        return channelBody;
+    }
+
     public List<String> getFunctionAsPath() {
         return functionAsPath;
     }
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
index 14db134..175280e 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelTupleTranslator.java
@@ -123,11 +123,15 @@
 
         }
 
+        String channelBody =
+                ((AString) 
channelRecord.getValueByPos(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX))
+                        .getStringValue();
+
         FunctionSignature signature = new 
FunctionSignature(functionSignature.get(0), functionSignature.get(1),
                 Integer.parseInt(functionSignature.get(2)));
 
         channel = new Channel(dataverseName, channelName, subscriptionsName, 
resultsName, signature, duration,
-                dependencies);
+                dependencies, channelBody);
         return channel;
     }
 
@@ -217,6 +221,12 @@
         dependenciesListBuilder.write(fieldValue.getDataOutput(), true);
         
recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_DEPENDENCIES_FIELD_INDEX,
 fieldValue);
 
+        // write field 7
+        fieldValue.reset();
+        aString.setValue(channel.getChannelBody());
+        stringSerde.serialize(aString, fieldValue.getDataOutput());
+        
recordBuilder.addField(BADMetadataRecordTypes.CHANNEL_ARECORD_BODY_FIELD_INDEX, 
fieldValue);
+
         // write record
         recordBuilder.write(tupleBuilder.getDataOutput(), true);
 
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
index 070c148..78f7c95 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.bad.metadata;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.asterix.active.ActiveEvent;
 import org.apache.asterix.active.ActiveEvent.Kind;
 import org.apache.asterix.active.ActivityState;
@@ -25,37 +27,26 @@
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.metadata.IDataset;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.DeployedJobSpecId;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
 
 public class DeployedJobSpecEventListener implements 
IActiveEntityEventsListener {
 
     private static final Logger LOGGER = 
Logger.getLogger(DeployedJobSpecEventListener.class);
 
-
     public enum PrecompiledType {
         CHANNEL,
+        PUSH_CHANNEL,
         QUERY,
         INSERT,
         DELETE
-    }
-
-    enum RequestState {
-        INIT,
-        STARTED,
-        FINISHED
     }
 
     private DeployedJobSpecId deployedJobSpecId;
@@ -67,14 +58,11 @@
 
     // members
     protected volatile ActivityState state;
-    protected JobId jobId;
-    protected final List<IActiveEntityEventSubscriber> subscribers = new 
ArrayList<>();
     protected final ICcApplicationContext appCtx;
     protected final EntityId entityId;
     protected final ActiveEvent statsUpdatedEvent;
     protected long statsTimestamp;
     protected String stats;
-    protected RequestState statsRequestState;
     protected final String runtimeName;
     protected final AlgebricksAbsolutePartitionConstraint locations;
     private int runningInstance;
@@ -83,17 +71,14 @@
             AlgebricksAbsolutePartitionConstraint locations, String 
runtimeName) {
         this.appCtx = appCtx;
         this.entityId = entityId;
-        this.state = ActivityState.STOPPED;
+        setState(ActivityState.STOPPED);
         this.statsTimestamp = -1;
-        this.statsRequestState = RequestState.INIT;
         this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, 
entityId, null);
         this.stats = "{\"Stats\":\"N/A\"}";
         this.runtimeName = runtimeName;
         this.locations = locations;
-        state = ActivityState.STOPPED;
         this.type = type;
     }
-
 
     public IHyracksDataset getResultDataset() {
         return hdc;
@@ -122,10 +107,6 @@
         return false;
     }
 
-    public JobId getJobId() {
-        return jobId;
-    }
-
     @Override
     public String getStats() {
         return stats;
@@ -134,40 +115,6 @@
     @Override
     public long getStatsTimeStamp() {
         return statsTimestamp;
-    }
-
-    public String formatStats(List<String> responses) {
-        StringBuilder strBuilder = new StringBuilder();
-        strBuilder.append("{\"Stats\": [").append(responses.get(0));
-        for (int i = 1; i < responses.size(); i++) {
-            strBuilder.append(", ").append(responses.get(i));
-        }
-        strBuilder.append("]}");
-        return strBuilder.toString();
-    }
-
-    protected synchronized void notifySubscribers(ActiveEvent event) {
-        notifyAll();
-        Iterator<IActiveEntityEventSubscriber> it = subscribers.iterator();
-        while (it.hasNext()) {
-            IActiveEntityEventSubscriber subscriber = it.next();
-            if (subscriber.isDone()) {
-                it.remove();
-            } else {
-                try {
-                    subscriber.notify(event);
-                } catch (HyracksDataException e) {
-                    LOGGER.log(Level.WARN, "Failed to notify subscriber", e);
-                }
-                if (subscriber.isDone()) {
-                    it.remove();
-                }
-            }
-        }
-    }
-
-    public AlgebricksAbsolutePartitionConstraint getLocations() {
-        return locations;
     }
 
     public PrecompiledType getType() {
@@ -214,12 +161,18 @@
         // no op
     }
 
+    protected synchronized void setState(ActivityState newState) {
+        LOGGER.info("State of " + getEntityId() + "is being set to " + 
newState + " from " + state);
+        this.state = newState;
+        notifyAll();
+    }
+
     private synchronized void handleJobStartEvent(ActiveEvent message) throws 
Exception {
         if (LOGGER.isInfoEnabled()) {
             LOGGER.info("Channel Job started for  " + entityId);
         }
         runningInstance++;
-        state = ActivityState.RUNNING;
+        setState(ActivityState.RUNNING);
     }
 
     private synchronized void handleJobFinishEvent(ActiveEvent message) throws 
Exception {
@@ -228,10 +181,34 @@
         }
         runningInstance--;
         if (runningInstance == 0) {
-            state = ActivityState.STOPPED;
+            setState(ActivityState.STOPPED);
         }
     }
 
+    public synchronized void waitWhileAtState(ActivityState undesiredState) 
throws InterruptedException {
+        while (state == undesiredState) {
+            this.wait();
+        }
+    }
+
+    public synchronized void suspend() throws HyracksDataException, 
InterruptedException {
+        LOGGER.info("Suspending entity " + entityId);
+        LOGGER.info("Waiting for ongoing activities of " + entityId);
+        waitWhileAtState(ActivityState.RUNNING);
+        LOGGER.info("Proceeding with suspension of " + entityId + ". Current 
state is " + state);
+        setState(ActivityState.SUSPENDED);
+        LOGGER.info("Successfully Suspended " + entityId);
+    }
+
+    public synchronized void resume() throws HyracksDataException {
+        LOGGER.info("Resuming entity " + entityId);
+        if (state != ActivityState.SUSPENDED) {
+            throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_RESUME_FROM_STATE, 
entityId, state);
+        }
+        setState(ActivityState.STOPPED);
+        LOGGER.info("Successfully resumed " + entityId);
+    }
+
     @Override
     public synchronized void subscribe(IActiveEntityEventSubscriber 
subscriber) throws HyracksDataException {
         // no op
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
index 5712539..50d506b 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
@@ -28,10 +28,7 @@
 
 public class Procedure implements IExtensionMetadataEntity {
     private static final long serialVersionUID = 1L;
-    public static final String LANGUAGE_JAVA = "JAVA";
-
     public static final String RETURNTYPE_VOID = "VOID";
-    public static final String NOT_APPLICABLE = "N/A";
 
     private final EntityId procedureId;
     private final int arity;
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt 
b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 2d7ba75..4c83dc5 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -161,7 +161,7 @@
     }
   ("period" period = FunctionCallExpr())?
   {
-  return new CreateProcedureStatement(signature, paramList, paramIds, 
functionBody, functionBodyExpr, period);
+  return new CreateProcedureStatement(signature, paramList, paramIds, 
functionBody, period);
   }
 }
 
diff --git 
a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java 
b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java
new file mode 100644
index 0000000..1cd49e3
--- /dev/null
+++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.test;
+
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class BADListenerTest {
+
+    private static DeployedJobSpecEventListener djsel;
+
+    private class suspend extends Thread {
+        @Override
+        public void run() {
+            try {
+                djsel.suspend();
+                Thread.sleep(5000);
+                djsel.resume();
+            } catch (HyracksDataException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+    private class run extends Thread {
+        @Override
+        public void run() {
+            try {
+                djsel.notify(new ActiveEvent(null, 
ActiveEvent.Kind.JOB_STARTED, null, null));
+                djsel.notify(new ActiveEvent(null, 
ActiveEvent.Kind.JOB_STARTED, null, null));
+                djsel.notify(new ActiveEvent(null, 
ActiveEvent.Kind.JOB_FINISHED, null, null));
+                Thread.sleep(5000);
+                djsel.notify(new ActiveEvent(null, 
ActiveEvent.Kind.JOB_FINISHED, null, null));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+    }
+
+    @BeforeClass
+    public static void init() {
+        djsel = new DeployedJobSpecEventListener(null,
+                new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, "test", 
"test"),
+                DeployedJobSpecEventListener.PrecompiledType.CHANNEL, null, 
"BadListener");
+    }
+
+    @Test
+    public void DistributedTest() throws Exception {
+        new suspend().run();
+        djsel.waitWhileAtState(ActivityState.SUSPENDED);
+        new run().run();
+        djsel.suspend();
+    }
+
+    @AfterClass
+    public static void deinit() throws Exception {
+
+    }
+}
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp
new file mode 100644
index 0000000..819d052
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.1.ddl.sqlpp
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type UserLocation as {
+  location: circle,
+  userName: string
+};
+
+
+create type EmergencyReport as {
+  reportId: uuid,
+  Etype: string,
+  location: circle
+};
+
+
+create type EmergencyShelter as {
+  shelterName: string,
+  location: point
+};
+
+create dataset UserLocations(UserLocation)
+primary key userName;
+create dataset Shelters(EmergencyShelter)
+primary key shelterName;
+create dataset Reports(EmergencyReport)
+primary key reportId autogenerated;
+
+create index u_location on UserLocations(location) type RTREE;
+
+
+create function RecentEmergenciesNearUser(userName) {
+  (
+  select report, shelters from
+   ( select value r from Reports r)report,
+  UserLocations u
+    let shelters = (select s.location from Shelters s where 
spatial_intersect(s.location,u.location))
+  where u.userName = userName
+  and spatial_intersect(report.location,u.location)
+  )
+};
+
+create repetitive channel EmergencyChannel using RecentEmergenciesNearUser@1 
period duration("PT10S");
+
+create broker brokerA at "http://www.notifyA.com";;
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
new file mode 100644
index 0000000..0a38e41
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.2.update.sqlpp
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : channels Jacobs
+*/
+
+use channels;
+
+insert into EmergencyChannelSubscriptions(
+[
+{"param0" : "w2294u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4321u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3398u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w2488u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3666u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4489u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p78u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "p544u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "p711u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t2828u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4796u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4082u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4923u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w2324u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1339u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p520u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "c1092u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4979u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1487u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4330u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3682u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p117u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "w1741u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w2434u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3833u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1373u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p89u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4003u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c910u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4961u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4475u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w1960u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p438u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "c1362u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p588u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "c902u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4684u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1609u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1510u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3851u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1418u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2559u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w1815u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4924u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3320u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p663u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4571u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p781u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "c919u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "c1121u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p814u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4006u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2822u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4953u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3486u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3107u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2836u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w2003u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3256u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4762u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4900u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p357u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t3630u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3166u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4687u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p817u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4433u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3426u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p582u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t3388u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4823u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1664u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4051u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c857u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "c1412u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2521u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3114u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p404u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "p111u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t3006u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2903u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2823u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4153u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2589u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1459u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p766u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "p593u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "p168u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t4253u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4177u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p387u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t2571u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "c1513u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p618u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
},
+{"param0" : "t2735u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t4859u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "w1848u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t3306u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "t2558u1" , "DataverseName" : "channels" , "BrokerName" : 
"brokerA" },
+{"param0" : "p180u1" , "DataverseName" : "channels" , "BrokerName" : "brokerA" 
}
+]
+);
+
+insert into Shelters (
+[
+{"shelterName" : "s5064" , "location" : point("2437.34,1330.59") },
+{"shelterName" : "s5180" , "location" : point("2479.66,939.05") },
+{"shelterName" : "s5025" , "location" : point("3343.08,1139.29") },
+{"shelterName" : "s5218" , "location" : point("1361.85,795.99") },
+{"shelterName" : "s5199" , "location" : point("3619.88,1245.75") },
+{"shelterName" : "s5167" , "location" : point("2836.86,237.96") },
+{"shelterName" : "s5068" , "location" : point("2580.27,881.71") },
+{"shelterName" : "s5026" , "location" : point("2040.75,816.3") },
+{"shelterName" : "s5080" , "location" : point("2785.14,1090.68") },
+{"shelterName" : "s5190" , "location" : point("1296.09,1256.87") },
+{"shelterName" : "s5126" , "location" : point("1121.37,1478.3") },
+{"shelterName" : "s5075" , "location" : point("2679.51,488.23") },
+{"shelterName" : "s5187" , "location" : point("2544.14,2464.15") },
+{"shelterName" : "s5061" , "location" : point("1674.35,1265.46") },
+{"shelterName" : "s5057" , "location" : point("1677.96,1151.83") },
+{"shelterName" : "s5149" , "location" : point("2236.78,1068.51") },
+{"shelterName" : "s5058" , "location" : point("2919.37,2055.98") },
+{"shelterName" : "s5144" , "location" : point("731.17,1504.74") },
+{"shelterName" : "s5099" , "location" : point("2594.22,1078.11") },
+{"shelterName" : "s5030" , "location" : point("3678.46,978.38") },
+{"shelterName" : "s5050" , "location" : point("2395.61,1332.9") },
+{"shelterName" : "s5209" , "location" : point("3093.56,1833.18") },
+{"shelterName" : "s5052" , "location" : point("3052.36,1076.89") },
+{"shelterName" : "s5208" , "location" : point("1513.18,796.41") },
+{"shelterName" : "s5027" , "location" : point("671.97,2004.72") },
+{"shelterName" : "s5093" , "location" : point("2297.68,1933.79") },
+{"shelterName" : "s5202" , "location" : point("2854.73,1153.14") },
+{"shelterName" : "s5106" , "location" : point("1971.88,737.51") },
+{"shelterName" : "s5192" , "location" : point("2638.68,1688.7") },
+{"shelterName" : "s5128" , "location" : point("1970.48,2569.09") },
+{"shelterName" : "s5098" , "location" : point("2596.02,1499.58") },
+{"shelterName" : "s5038" , "location" : point("2297.07,995.53") },
+{"shelterName" : "s5176" , "location" : point("1570.34,1701.0") },
+{"shelterName" : "s5066" , "location" : point("820.41,2092.07") },
+{"shelterName" : "s5108" , "location" : point("459.09,395.34") },
+{"shelterName" : "s5162" , "location" : point("2543.99,1822.57") },
+{"shelterName" : "s5195" , "location" : point("3190.14,979.74") },
+{"shelterName" : "s5212" , "location" : point("2478.36,921.15") },
+{"shelterName" : "s5217" , "location" : point("2411.71,807.77") },
+{"shelterName" : "s5049" , "location" : point("447.77,1527.37") },
+{"shelterName" : "s5163" , "location" : point("2356.91,1221.63") },
+{"shelterName" : "s5048" , "location" : point("1495.47,1597.15") },
+{"shelterName" : "s5154" , "location" : point("1503.31,884.66") },
+{"shelterName" : "s5116" , "location" : point("1359.98,314.68") },
+{"shelterName" : "s5143" , "location" : point("3000.77,1080.81") },
+{"shelterName" : "s5073" , "location" : point("2404.16,1516.85") },
+{"shelterName" : "s5032" , "location" : point("2370.75,1879.99") },
+{"shelterName" : "s5152" , "location" : point("1091.19,706.09") },
+{"shelterName" : "s5044" , "location" : point("2395.55,1378.92") },
+{"shelterName" : "s5081" , "location" : point("2057.88,838.33") },
+{"shelterName" : "s5130" , "location" : point("1511.27,1082.85") },
+{"shelterName" : "s5142" , "location" : point("2634.22,1631.85") },
+{"shelterName" : "s5071" , "location" : point("1799.13,2130.04") },
+{"shelterName" : "s5051" , "location" : point("2414.52,807.3") },
+{"shelterName" : "s5147" , "location" : point("1960.71,976.11") },
+{"shelterName" : "s5021" , "location" : point("2659.55,1957.04") },
+{"shelterName" : "s5139" , "location" : point("1863.26,1216.29") },
+{"shelterName" : "s5062" , "location" : point("937.48,1608.88") },
+{"shelterName" : "s5168" , "location" : point("2245.27,1012.57") },
+{"shelterName" : "s5196" , "location" : point("2341.23,1883.0") },
+{"shelterName" : "s5119" , "location" : point("1702.71,2557.47") },
+{"shelterName" : "s5150" , "location" : point("1613.07,1804.66") },
+{"shelterName" : "s5171" , "location" : point("2789.6,931.73") },
+{"shelterName" : "s5102" , "location" : point("3483.61,1156.18") },
+{"shelterName" : "s5091" , "location" : point("1081.51,1596.95") },
+{"shelterName" : "s5132" , "location" : point("2099.11,1086.28") },
+{"shelterName" : "s5104" , "location" : point("2896.78,461.73") },
+{"shelterName" : "s5022" , "location" : point("1920.77,1657.89") },
+{"shelterName" : "s5219" , "location" : point("2913.07,954.89") },
+{"shelterName" : "s5088" , "location" : point("1831.54,2241.22") },
+{"shelterName" : "s5166" , "location" : point("1087.15,2048.13") },
+{"shelterName" : "s5203" , "location" : point("2543.5,1815.38") },
+{"shelterName" : "s5136" , "location" : point("2503.29,1270.07") },
+{"shelterName" : "s5194" , "location" : point("1595.1,1634.0") },
+{"shelterName" : "s5060" , "location" : point("2230.74,818.1") },
+{"shelterName" : "s5127" , "location" : point("2567.07,1104.86") },
+{"shelterName" : "s5092" , "location" : point("1732.18,1170.23") },
+{"shelterName" : "s5124" , "location" : point("2456.58,983.76") },
+{"shelterName" : "s5201" , "location" : point("1875.2,1300.76") },
+{"shelterName" : "s5029" , "location" : point("2581.92,690.14") },
+{"shelterName" : "s5146" , "location" : point("2437.06,2491.18") },
+{"shelterName" : "s5074" , "location" : point("1761.92,2035.44") },
+{"shelterName" : "s5173" , "location" : point("1000.01,1488.16") },
+{"shelterName" : "s5039" , "location" : point("2604.75,630.95") },
+{"shelterName" : "s5020" , "location" : point("1920.62,670.07") },
+{"shelterName" : "s5120" , "location" : point("1562.27,1045.02") },
+{"shelterName" : "s5083" , "location" : point("964.22,1606.66") },
+{"shelterName" : "s5122" , "location" : point("2253.13,1556.55") },
+{"shelterName" : "s5103" , "location" : point("2023.99,2505.02") },
+{"shelterName" : "s5155" , "location" : point("2996.58,390.66") },
+{"shelterName" : "s5076" , "location" : point("1025.57,515.66") },
+{"shelterName" : "s5086" , "location" : point("2384.13,886.5") },
+{"shelterName" : "s5053" , "location" : point("1173.98,2173.29") },
+{"shelterName" : "s5216" , "location" : point("2865.64,1182.0") },
+{"shelterName" : "s5065" , "location" : point("2633.42,574.61") },
+{"shelterName" : "s5055" , "location" : point("1236.87,876.11") },
+{"shelterName" : "s5215" , "location" : point("2272.8,1115.83") },
+{"shelterName" : "s5210" , "location" : point("1314.22,729.82") },
+{"shelterName" : "s5200" , "location" : point("1776.8,1176.29") },
+{"shelterName" : "s5165" , "location" : point("3485.44,980.34") },
+{"shelterName" : "s5042" , "location" : point("1812.4,1252.84") }
+]
+);
+
+insert into UserLocations (
+[
+{"userName" : "w2294u1" , "location" : circle("2683.3,480.84 100.0")},
+{"userName" : "t4321u1" , "location" : circle("1990.77,754.24 100.0")},
+{"userName" : "t3398u1" , "location" : circle("2791.2,962.92 100.0")},
+{"userName" : "w2488u1" , "location" : circle("2040.19,767.35 100.0")},
+{"userName" : "t3666u1" , "location" : circle("3968.68,1308.04 100.0")},
+{"userName" : "t4489u1" , "location" : circle("1713.82,252.6 100.0")},
+{"userName" : "p78u1" , "location" : circle("2588.34,735.72 100.0")},
+{"userName" : "p544u1" , "location" : circle("1993.69,1465.29 100.0")},
+{"userName" : "p711u1" , "location" : circle("3221.64,1062.22 100.0")},
+{"userName" : "t2828u1" , "location" : circle("2534.49,978.95 100.0")},
+{"userName" : "t4796u1" , "location" : circle("1752.58,323.55 100.0")},
+{"userName" : "t4082u1" , "location" : circle("3571.17,1213.78 100.0")},
+{"userName" : "t4923u1" , "location" : circle("1878.19,587.26 100.0")},
+{"userName" : "w2324u1" , "location" : circle("2851.94,1429.02 100.0")},
+{"userName" : "c1339u1" , "location" : circle("2881.61,956.95 100.0")},
+{"userName" : "p520u1" , "location" : circle("2927.9,986.41 100.0")},
+{"userName" : "c1092u1" , "location" : circle("1670.02,1019.43 100.0")},
+{"userName" : "t4979u1" , "location" : circle("1856.46,543.47 100.0")},
+{"userName" : "c1487u1" , "location" : circle("1757.21,745.68 100.0")},
+{"userName" : "t4330u1" , "location" : circle("1596.34,59.37 100.0")},
+{"userName" : "t3682u1" , "location" : circle("3557.22,1204.63 100.0")},
+{"userName" : "p117u1" , "location" : circle("1717.46,1166.14 100.0")},
+{"userName" : "w1741u1" , "location" : circle("2517.33,980.01 100.0")},
+{"userName" : "w2434u1" , "location" : circle("527.29,1520.52 100.0")},
+{"userName" : "t3833u1" , "location" : circle("3521.17,1180.97 100.0")},
+{"userName" : "c1373u1" , "location" : circle("1733.98,742.36 100.0")},
+{"userName" : "p89u1" , "location" : circle("3768.73,1013.03 100.0")},
+{"userName" : "t4003u1" , "location" : circle("2945.36,952.79 100.0")},
+{"userName" : "c910u1" , "location" : circle("1124.86,1275.91 100.0")},
+{"userName" : "t4961u1" , "location" : circle("1575.25,0.0 100.0")},
+{"userName" : "t4475u1" , "location" : circle("1607.59,79.94 100.0")},
+{"userName" : "w1960u1" , "location" : circle("1768.23,2263.45 100.0")},
+{"userName" : "p438u1" , "location" : circle("898.13,238.92 100.0")},
+{"userName" : "c1362u1" , "location" : circle("1438.64,400.67 100.0")},
+{"userName" : "p588u1" , "location" : circle("3206.01,1052.3 100.0")},
+{"userName" : "c902u1" , "location" : circle("1615.16,1251.39 100.0")},
+{"userName" : "t4684u1" , "location" : circle("1876.6,584.06 100.0")},
+{"userName" : "c1609u1" , "location" : circle("2320.02,725.01 100.0")},
+{"userName" : "c1510u1" , "location" : circle("2256.6,816.78 100.0")},
+{"userName" : "t3851u1" , "location" : circle("3807.01,1304.1 100.0")},
+{"userName" : "c1418u1" , "location" : circle("2273.25,815.93 100.0")},
+{"userName" : "t2559u1" , "location" : circle("2297.49,1887.46 100.0")},
+{"userName" : "w1815u1" , "location" : circle("1539.89,343.08 100.0")},
+{"userName" : "t4924u1" , "location" : circle("1609.65,83.72 100.0")},
+{"userName" : "t3320u1" , "location" : circle("2322.45,1277.89 100.0")},
+{"userName" : "p663u1" , "location" : circle("2291.38,254.83 100.0")},
+{"userName" : "t4571u1" , "location" : circle("2253.78,1090.84 100.0")},
+{"userName" : "p781u1" , "location" : circle("1154.4,712.8 100.0")},
+{"userName" : "c919u1" , "location" : circle("1721.46,485.13 100.0")},
+{"userName" : "c1121u1" , "location" : circle("4171.58,1083.41 100.0")},
+{"userName" : "p814u1" , "location" : circle("2176.13,990.55 100.0")},
+{"userName" : "t4006u1" , "location" : circle("3977.9,1303.22 100.0")},
+{"userName" : "t2822u1" , "location" : circle("3087.51,1849.01 100.0")},
+{"userName" : "t4953u1" , "location" : circle("1923.91,676.56 100.0")},
+{"userName" : "t3486u1" , "location" : circle("3832.72,1320.87 100.0")},
+{"userName" : "t3107u1" , "location" : circle("2994.64,2023.92 100.0")},
+{"userName" : "t2836u1" , "location" : circle("2307.27,2096.07 100.0")},
+{"userName" : "w2003u1" , "location" : circle("1976.08,2279.1 100.0")},
+{"userName" : "t3256u1" , "location" : circle("2161.32,1700.73 100.0")},
+{"userName" : "t4762u1" , "location" : circle("1916.88,662.71 100.0")},
+{"userName" : "t4900u1" , "location" : circle("2253.78,1090.84 100.0")},
+{"userName" : "p357u1" , "location" : circle("1699.66,1976.29 100.0")},
+{"userName" : "t3630u1" , "location" : circle("3778.3,1285.37 100.0")},
+{"userName" : "t3166u1" , "location" : circle("2743.85,965.95 100.0")},
+{"userName" : "t4687u1" , "location" : circle("1798.2,426.06 100.0")},
+{"userName" : "p817u1" , "location" : circle("1446.92,909.48 100.0")},
+{"userName" : "t4433u1" , "location" : circle("1805.69,441.15 100.0")},
+{"userName" : "t3426u1" , "location" : circle("3055.08,945.53 100.0")},
+{"userName" : "p582u1" , "location" : circle("1523.47,739.18 100.0")},
+{"userName" : "t3388u1" , "location" : circle("2919.63,954.46 100.0")},
+{"userName" : "t4823u1" , "location" : circle("2174.09,987.9 100.0")},
+{"userName" : "c1664u1" , "location" : circle("1283.46,1099.95 100.0")},
+{"userName" : "t4051u1" , "location" : circle("4047.21,1215.22 100.0")},
+{"userName" : "c857u1" , "location" : circle("1270.33,664.46 100.0")},
+{"userName" : "c1412u1" , "location" : circle("1279.63,1731.15 100.0")},
+{"userName" : "t2521u1" , "location" : circle("2850.02,1040.64 100.0")},
+{"userName" : "t3114u1" , "location" : circle("2294.53,995.82 100.0")},
+{"userName" : "p404u1" , "location" : circle("3789.35,1022.15 100.0")},
+{"userName" : "p111u1" , "location" : circle("2054.96,904.61 100.0")},
+{"userName" : "t3006u1" , "location" : circle("3095.66,1781.54 100.0")},
+{"userName" : "t2903u1" , "location" : circle("2449.83,984.17 100.0")},
+{"userName" : "t2823u1" , "location" : circle("2116.91,1638.54 100.0")},
+{"userName" : "t4153u1" , "location" : circle("3977.9,1303.22 100.0")},
+{"userName" : "t2589u1" , "location" : circle("2075.58,1455.78 100.0")},
+{"userName" : "c1459u1" , "location" : circle("2149.15,1104.3 100.0")},
+{"userName" : "p766u1" , "location" : circle("3144.94,1042.37 100.0")},
+{"userName" : "p593u1" , "location" : circle("3460.96,1141.22 100.0")},
+{"userName" : "p168u1" , "location" : circle("1719.0,2688.66 100.0")},
+{"userName" : "t4253u1" , "location" : circle("1575.25,0.0 100.0")},
+{"userName" : "t4177u1" , "location" : circle("2185.18,1002.25 100.0")},
+{"userName" : "p387u1" , "location" : circle("2351.74,811.8 100.0")},
+{"userName" : "t2571u1" , "location" : circle("2307.49,1915.48 100.0")},
+{"userName" : "c1513u1" , "location" : circle("3432.64,1067.27 100.0")},
+{"userName" : "p618u1" , "location" : circle("1682.87,1962.44 100.0")},
+{"userName" : "t2735u1" , "location" : circle("2999.73,1635.76 100.0")},
+{"userName" : "t4859u1" , "location" : circle("1763.96,348.87 100.0")},
+{"userName" : "w1848u1" , "location" : circle("2675.62,2150.98 100.0")},
+{"userName" : "t3306u1" , "location" : circle("2274.48,1312.75 100.0")},
+{"userName" : "t2558u1" , "location" : circle("2254.43,2174.64 100.0")},
+{"userName" : "p180u1" , "location" : circle("1253.66,1771.06 100.0")}
+]
+);
+
+insert into Reports
+(
+[
+{"Etype" : "flood" , "location" : circle("846.5,2589.56 1000.0")},
+{"Etype" : "crash" , "location" : circle("953.48,2504.12 100.0")},
+{"Etype" : "flood" , "location" : circle("2313.19,1641.15 1000.0")},
+{"Etype" : "fire" , "location" : circle("3014.66,2332.34 500.0")},
+{"Etype" : "flood" , "location" : circle("1188.75,2307.52 1000.0")},
+{"Etype" : "fire" , "location" : circle("3418.08,1090.2 500.0")},
+{"Etype" : "fire" , "location" : circle("1364.9,1434.81 500.0")},
+{"Etype" : "storm" , "location" : circle("1164.65,2088.5 2000.0")},
+{"Etype" : "flood" , "location" : circle("3582.04,974.89 1000.0")},
+{"Etype" : "flood" , "location" : circle("1016.15,846.6 1000.0")},
+{"Etype" : "flood" , "location" : circle("1416.4,1483.13 1000.0")},
+{"Etype" : "flood" , "location" : circle("2777.23,963.83 1000.0")},
+{"Etype" : "fire" , "location" : circle("3082.74,1746.62 500.0")},
+{"Etype" : "storm" , "location" : circle("1186.15,2283.96 2000.0")},
+{"Etype" : "crash" , "location" : circle("1218.17,1591.68 100.0")},
+{"Etype" : "fire" , "location" : circle("1141.24,1474.73 500.0")},
+{"Etype" : "flood" , "location" : circle("1105.73,1875.44 1000.0")},
+{"Etype" : "flood" , "location" : circle("1805.37,2346.03 1000.0")},
+{"Etype" : "flood" , "location" : circle("1535.83,1546.66 1000.0")},
+{"Etype" : "fire" , "location" : circle("4187.75,1064.17 500.0")}
+]
+);
+
+insert into Reports
+(
+[
+{"Etype" : "flood" , "location" : circle("846.5,2589.56 1000.0")},
+{"Etype" : "crash" , "location" : circle("953.48,2504.12 100.0")},
+{"Etype" : "flood" , "location" : circle("2313.19,1641.15 1000.0")},
+{"Etype" : "fire" , "location" : circle("3014.66,2332.34 500.0")},
+{"Etype" : "flood" , "location" : circle("1188.75,2307.52 1000.0")},
+{"Etype" : "fire" , "location" : circle("3418.08,1090.2 500.0")},
+{"Etype" : "fire" , "location" : circle("1364.9,1434.81 500.0")},
+{"Etype" : "storm" , "location" : circle("1164.65,2088.5 2000.0")},
+{"Etype" : "flood" , "location" : circle("3582.04,974.89 1000.0")},
+{"Etype" : "flood" , "location" : circle("1016.15,846.6 1000.0")},
+{"Etype" : "flood" , "location" : circle("1416.4,1483.13 1000.0")},
+{"Etype" : "flood" , "location" : circle("2777.23,963.83 1000.0")},
+{"Etype" : "fire" , "location" : circle("3082.74,1746.62 500.0")},
+{"Etype" : "storm" , "location" : circle("1186.15,2283.96 2000.0")},
+{"Etype" : "crash" , "location" : circle("1218.17,1591.68 100.0")},
+{"Etype" : "fire" , "location" : circle("1141.24,1474.73 500.0")},
+{"Etype" : "flood" , "location" : circle("1105.73,1875.44 1000.0")},
+{"Etype" : "flood" , "location" : circle("1805.37,2346.03 1000.0")},
+{"Etype" : "flood" , "location" : circle("1535.83,1546.66 1000.0")},
+{"Etype" : "fire" , "location" : circle("4187.75,1064.17 500.0")}
+]
+);
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.3.update.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.3.update.sqlpp
new file mode 100644
index 0000000..d0f65e5
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.3.update.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : Steven Jacobs
+*/
+
+use channels;
+
+create index delivery on EmergencyChannelResults(deliveryTime);
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
new file mode 100644
index 0000000..5f764b3
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.4.sleep.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : Steven Jacobs
+*/
+
+15000
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.query.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.query.sqlpp
new file mode 100644
index 0000000..dd6e1ca
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.5.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : Steven Jacobs
+*/
+
+use channels;
+
+select value array_count(
+(select * from EmergencyChannelResults where deliveryTime > 
datetime("2017-05-02T17:52:59.570Z")));
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.6.ddl.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.6.ddl.sqlpp
new file mode 100644
index 0000000..54075e5
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/add_index/add_index.6.ddl.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Check Whether a Channel works after adding a new Index
+* Expected Res : Success
+* Date         : Apr 2018
+* Author       : Steven Jacobs
+*/
+
+use channels;
+
+drop channel EmergencyChannel;
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/drop_index/drop_index.1.ddl.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_index/drop_index.1.ddl.sqlpp
new file mode 100644
index 0000000..ca01dd7
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/drop_index/drop_index.1.ddl.sqlpp
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Drop Function Dataset Index
+* Expected Res : Error
+* Date         : Jan 2018
+* Author       : Steven Jacobs
+*/
+
+drop dataverse two if exists;
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type UserLocation as {
+  location: circle,
+  userName: string,
+  timeStamp: datetime
+};
+
+
+create type UserLocationFeedType as {
+  location: circle,
+  userName: string
+};
+
+create type EmergencyReport as {
+  reportId: uuid,
+  Etype: string,
+  location: circle,
+  timeStamp: datetime
+};
+
+create type EmergencyReportFeedType as {
+  Etype: string,
+  location: circle
+};
+
+
+create type EmergencyShelter as {
+  shelterName: string,
+  location: point
+};
+
+create dataset UserLocations(UserLocation)
+primary key userName;
+create dataset Shelters(EmergencyShelter)
+primary key shelterName;
+create dataset Reports(EmergencyReport)
+primary key reportId autogenerated;
+
+create index location_time on UserLocations(timeStamp);
+create index u_location on UserLocations(location) type RTREE;
+create index s_location on Shelters(location) type RTREE;
+create index report_time on Reports(timeStamp);
+
+create function RecentEmergenciesNearUser(userName) {
+  (
+  select report, shelters from
+   ( select value r from Reports r where r.timeStamp >
+   current_datetime() - day_time_duration("PT10S"))report,
+  UserLocations u
+    let shelters = (select s.location from Shelters s where 
spatial_intersect(s.location,u.location))
+  where u.userName = userName
+  and spatial_intersect(report.location,u.location)
+  )
+};
+
+create repetitive channel EmergencyChannel using RecentEmergenciesNearUser@1 
period duration("PT10S");
+
+use channels;
+drop index Shelters.s_location;
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/results/channel/add_index/add_index.1.adm
 
b/asterix-bad/src/test/resources/runtimets/results/channel/add_index/add_index.1.adm
new file mode 100644
index 0000000..2e9bdd9
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/results/channel/add_index/add_index.1.adm
@@ -0,0 +1 @@
+1074
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
 
b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
index bee9157..225d83f 100644
--- 
a/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
+++ 
b/asterix-bad/src/test/resources/runtimets/results/channel/create_channel_check_metadata/create_channel_check_metadata.1.adm
@@ -1 +1 @@
-{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel", 
"SubscriptionsDatasetName": "nearbyTweetChannelSubscriptions", 
"ResultsDatasetName": "nearbyTweetChannelResults", "Function": [ "channels", 
"NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ 
"channels", "nearbyTweetChannelResults" ], [ "channels", 
"nearbyTweetChannelSubscriptions" ] ], [ [ "channels", 
"NearbyTweetsContainingText", "2" ] ] ] }
\ No newline at end of file
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel", 
"SubscriptionsDatasetName": "nearbyTweetChannelSubscriptions", 
"ResultsDatasetName": "nearbyTweetChannelResults", "Function": [ "channels", 
"NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ 
"channels", "nearbyTweetChannelResults" ], [ "channels", 
"nearbyTweetChannelSubscriptions" ] ], [ [ "channels", 
"NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with 
\"false\";\ninsert into channels.nearbyTweetChannelResults as a (\nwith 
channelExecutionTime as current_datetime() \nselect result, 
channelExecutionTime, sub.subscriptionId as subscriptionId,current_datetime() 
as deliveryTime\nfrom channels.nearbyTweetChannelSubscriptions 
sub,\nMetadata.Broker b, 
\nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \nwhere 
b.BrokerName = sub.BrokerName\nand b.DataverseName = sub.DataverseName\n) 
returning a;" }
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
 
b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
index 1c492ac..8d8899d 100644
--- 
a/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
+++ 
b/asterix-bad/src/test/resources/runtimets/results/channel/drop_channel_check_metadata/drop_channel_check_metadata.1.adm
@@ -1,2 +1,2 @@
-{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel1", 
"SubscriptionsDatasetName": "nearbyTweetChannel1Subscriptions", 
"ResultsDatasetName": "nearbyTweetChannel1Results", "Function": [ "channels", 
"NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ 
"channels", "nearbyTweetChannel1Results" ], [ "channels", 
"nearbyTweetChannel1Subscriptions" ] ], [ [ "channels", 
"NearbyTweetsContainingText", "2" ] ] ] }
-{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel3", 
"SubscriptionsDatasetName": "nearbyTweetChannel3Subscriptions", 
"ResultsDatasetName": "nearbyTweetChannel3Results", "Function": [ "channels", 
"NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ 
"channels", "nearbyTweetChannel3Results" ], [ "channels", 
"nearbyTweetChannel3Subscriptions" ] ], [ [ "channels", 
"NearbyTweetsContainingText", "2" ] ] ] }
\ No newline at end of file
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel1", 
"SubscriptionsDatasetName": "nearbyTweetChannel1Subscriptions", 
"ResultsDatasetName": "nearbyTweetChannel1Results", "Function": [ "channels", 
"NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ 
"channels", "nearbyTweetChannel1Results" ], [ "channels", 
"nearbyTweetChannel1Subscriptions" ] ], [ [ "channels", 
"NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with 
\"false\";\ninsert into channels.nearbyTweetChannel1Results as a (\nwith 
channelExecutionTime as current_datetime() \nselect result, 
channelExecutionTime, sub.subscriptionId as subscriptionId,current_datetime() 
as deliveryTime\nfrom channels.nearbyTweetChannel1Subscriptions 
sub,\nMetadata.Broker b, 
\nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \nwhere 
b.BrokerName = sub.BrokerName\nand b.DataverseName = sub.DataverseName\n) 
returning a;" }
+{ "DataverseName": "channels", "ChannelName": "nearbyTweetChannel3", 
"SubscriptionsDatasetName": "nearbyTweetChannel3Subscriptions", 
"ResultsDatasetName": "nearbyTweetChannel3Results", "Function": [ "channels", 
"NearbyTweetsContainingText", "2" ], "Duration": "PT10M", "Dependencies": [ [ [ 
"channels", "nearbyTweetChannel3Results" ], [ "channels", 
"nearbyTweetChannel3Subscriptions" ] ], [ [ "channels", 
"NearbyTweetsContainingText", "2" ] ] ], "Body": "SET inline_with 
\"false\";\ninsert into channels.nearbyTweetChannel3Results as a (\nwith 
channelExecutionTime as current_datetime() \nselect result, 
channelExecutionTime, sub.subscriptionId as subscriptionId,current_datetime() 
as deliveryTime\nfrom channels.nearbyTweetChannel3Subscriptions 
sub,\nMetadata.Broker b, 
\nchannels.NearbyTweetsContainingText(sub.param0,sub.param1) result \nwhere 
b.BrokerName = sub.BrokerName\nand b.DataverseName = sub.DataverseName\n) 
returning a;" }
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm
 
b/asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm
index 4308c83..c41aec1 100644
--- 
a/asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm
+++ 
b/asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm
@@ -1,6 +1,6 @@
-{ "DataverseName": "two", "ProcedureName": "addMe", "Arity": "0", "Params": [  
], "ReturnType": "VOID", "Definition": "insert into channels.UserLocations([\n  
  {\"timeStamp\":current_datetime(), \"roomNumber\":222}]\n  )", "Language": 
"AQL", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ 
 ] ] }
-{ "DataverseName": "two", "ProcedureName": "deleteSome", "Arity": "2", 
"Params": [ "$r", "$otherRoom" ], "ReturnType": "VOID", "Definition": "delete 
from channels.UserLocations\nwhere roomNumber = r\nor roomNumber = 
otherRoom\nand channels.really_contains(roomNumber,\"l\")", "Language": "AQL", 
"Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ 
"channels", "really_contains", "2" ] ] ] }
-{ "DataverseName": "two", "ProcedureName": "localAddMe", "Arity": "0", 
"Params": [  ], "ReturnType": "VOID", "Definition": "insert into 
UserLocations([\n    {\"timeStamp\":current_datetime(), \"roomNumber\":222}]\n  
)", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", 
"UserLocations" ] ], [  ] ] }
-{ "DataverseName": "two", "ProcedureName": "localDeleteSome", "Arity": "2", 
"Params": [ "$r", "$otherRoom" ], "ReturnType": "VOID", "Definition": "delete 
from UserLocations\nwhere roomNumber = r\nor roomNumber = otherRoom\nand 
really_contains(roomNumber,\"l\")", "Language": "AQL", "Duration": "", 
"Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "really_contains", 
"2" ] ] ] }
-{ "DataverseName": "two", "ProcedureName": "localSelectSome", "Arity": "2", 
"Params": [ "$r", "$otherRoom" ], "ReturnType": "VOID", "Definition": "select 
roomNumber from UserLocations\nwhere roomNumber = r\nor roomNumber = 
otherRoom\nand really_contains(roomNumber,\"l\")\norder by id", "Language": 
"AQL", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ 
"two", "really_contains", "2" ] ] ] }
-{ "DataverseName": "two", "ProcedureName": "selectSome", "Arity": "2", 
"Params": [ "$r", "$otherRoom" ], "ReturnType": "VOID", "Definition": "select 
roomNumber from channels.UserLocations\nwhere roomNumber = r\nor roomNumber = 
otherRoom\nand channels.really_contains(roomNumber,\"l\")\norder by id", 
"Language": "AQL", "Duration": "", "Dependencies": [ [ [ "channels", 
"UserLocations" ] ], [ [ "channels", "really_contains", "2" ] ] ] }
\ No newline at end of file
+{ "DataverseName": "two", "ProcedureName": "addMe", "Arity": "0", "Params": [  
], "ReturnType": "VOID", "Definition": "use two;\ninsert into 
channels.UserLocations([\n    {\"timeStamp\":current_datetime(), 
\"roomNumber\":222}]\n  );", "Language": "AQL", "Duration": "", "Dependencies": 
[ [ [ "channels", "UserLocations" ] ], [  ] ] }
+{ "DataverseName": "two", "ProcedureName": "deleteSome", "Arity": "2", 
"Params": [ "r", "otherRoom" ], "ReturnType": "VOID", "Definition": "use 
two;\ndelete from channels.UserLocations\nwhere roomNumber = 
get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand 
channels.really_contains(roomNumber,\"l\");", "Language": "AQL", "Duration": 
"", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ "channels", 
"really_contains", "2" ], [ "two", "get_job_param", "1" ], [ "two", 
"get_job_param", "1" ] ] ] }
+{ "DataverseName": "two", "ProcedureName": "localAddMe", "Arity": "0", 
"Params": [  ], "ReturnType": "VOID", "Definition": "use two;\ninsert into 
UserLocations([\n    {\"timeStamp\":current_datetime(), \"roomNumber\":222}]\n  
);", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", 
"UserLocations" ] ], [  ] ] }
+{ "DataverseName": "two", "ProcedureName": "localDeleteSome", "Arity": "2", 
"Params": [ "r", "otherRoom" ], "ReturnType": "VOID", "Definition": "use 
two;\ndelete from UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor 
roomNumber = get_job_param(\"otherRoom\")\nand 
really_contains(roomNumber,\"l\");", "Language": "AQL", "Duration": "", 
"Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "get_job_param", 
"1" ], [ "two", "really_contains", "2" ], [ "two", "get_job_param", "1" ] ] ] }
+{ "DataverseName": "two", "ProcedureName": "localSelectSome", "Arity": "2", 
"Params": [ "r", "otherRoom" ], "ReturnType": "VOID", "Definition": "use 
two;\nselect roomNumber from UserLocations\nwhere roomNumber = 
get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand 
really_contains(roomNumber,\"l\")\norder by id;", "Language": "AQL", 
"Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", 
"get_job_param", "1" ], [ "two", "really_contains", "2" ], [ "two", 
"get_job_param", "1" ] ] ] }
+{ "DataverseName": "two", "ProcedureName": "selectSome", "Arity": "2", 
"Params": [ "r", "otherRoom" ], "ReturnType": "VOID", "Definition": "use 
two;\nselect roomNumber from channels.UserLocations\nwhere roomNumber = 
get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand 
channels.really_contains(roomNumber,\"l\")\norder by id;", "Language": "AQL", 
"Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ 
"channels", "really_contains", "2" ], [ "two", "get_job_param", "1" ], [ "two", 
"get_job_param", "1" ] ] ] }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/testsuite.xml 
b/asterix-bad/src/test/resources/runtimets/testsuite.xml
index 3c72a14..4640af1 100644
--- a/asterix-bad/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-bad/src/test/resources/runtimets/testsuite.xml
@@ -135,6 +135,17 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="channel">
+      <compilation-unit name="drop_index">
+        <output-dir compare="Text">drop_index</output-dir>
+        <expected-error>Cannot drop index. channels.EmergencyChannel(Channel) 
depends on it!</expected-error>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="channel">
+      <compilation-unit name="add_index">
+        <output-dir compare="Text">add_index</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="channel">
       <compilation-unit name="drop_results">
         <output-dir compare="Text">drop_results</output-dir>
         <expected-error>Cannot alter dataset two.nearbyTweetChannelResults. 
two.nearbyTweetChannel(Channel) depends on it!</expected-error>

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2620
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: If0a4d37a5b91063fcb1673dbfd008c140ed54ae6
Gerrit-PatchSet: 9
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sjaco...@ucr.edu>
Gerrit-Reviewer: Xikui Wang <xkk...@gmail.com>

Reply via email to