Steven Jacobs has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2046

Change subject: BAD updates build check
......................................................................

BAD updates build check

Change-Id: I999879b1cae0de179a1d3c232fa940228979f4fe
---
M asterix-bad/pom.xml
M asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
M 
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
M asterix-bad/src/main/resources/lang-extension/lang.txt
M asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
M 
asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
M 
asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
A 
asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.1.ddl.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.2.update.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.3.update.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.4.sleep.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.5.query.sqlpp
M 
asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
M 
asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
M 
asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.ddl.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.query.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.4.query.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.ddl.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.update.sqlpp
A 
asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.3.update.sqlpp
A 
asterix-bad/src/test/resources/runtimets/results/channel/disasters_with_friends/disasters_with_friends.1.adm
A 
asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.adm
A 
asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.adm
A 
asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.adm
A 
asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.adm
M asterix-bad/src/test/resources/runtimets/testsuite.xml
33 files changed, 795 insertions(+), 183 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad 
refs/changes/46/2046/1

diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
index 8738162..cdeb5bd 100644
--- a/asterix-bad/pom.xml
+++ b/asterix-bad/pom.xml
@@ -244,6 +244,11 @@
       <version>${asterix.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-lang-common</artifactId>
+      <version>${asterix.version}</version>
+    </dependency>
+    <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <version>1.2.17</version>
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java 
b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
index ae24e0e..22c09a9 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
@@ -23,7 +23,9 @@
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.util.Date;
 import java.util.EnumSet;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -46,15 +48,18 @@
 
     private static final Logger LOGGER = 
Logger.getLogger(ChannelJobService.class.getName());
 
-    public static ScheduledExecutorService startJob(JobSpecification jobSpec, 
EnumSet<JobFlag> jobFlags, JobId jobId,
-            IHyracksClientConnection hcc, long duration)
+    public static ScheduledExecutorService startJob(JobSpecification jobSpec, 
EnumSet<JobFlag> jobFlags,
+            long distributedId,
+            IHyracksClientConnection hcc, long duration, Map<byte[], byte[]> 
jobParameters)
             throws Exception {
         ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(1);
         scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
                 try {
-                    executeJob(jobSpec, jobFlags, jobId, hcc);
+                    if (!executeJob(jobSpec, jobFlags, distributedId, hcc, 
jobParameters, duration)) {
+                        scheduledExecutorService.shutdown();
+                    }
                 } catch (Exception e) {
                     LOGGER.log(Level.WARNING, "Channel Job Failed to run.", e);
                 }
@@ -63,15 +68,30 @@
         return scheduledExecutorService;
     }
 
-    public static void executeJob(JobSpecification jobSpec, EnumSet<JobFlag> 
jobFlags, JobId jobId,
-            IHyracksClientConnection hcc)
+    public static boolean executeJob(JobSpecification jobSpec, 
EnumSet<JobFlag> jobFlags, long distributedId,
+            IHyracksClientConnection hcc, Map<byte[], byte[]> jobParameters, 
long duration)
             throws Exception {
-        LOGGER.info("Executing Channel Job");
-        if (jobId == null) {
-            hcc.startJob(jobSpec, jobFlags);
-        } else {
-            hcc.startJob(jobId);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Executing Distributed Job");
         }
+        boolean onTime = true;
+        JobId jobId;
+        Date checkStartTime = new Date();
+        if (distributedId == -1) {
+            jobId = hcc.startJob(jobSpec, jobFlags);
+        } else {
+            jobId = hcc.startJob(distributedId, jobParameters);
+        }
+        hcc.waitForCompletion(jobId);
+        Date checkEndTime = new Date();
+        long executionMilliseconds = (checkEndTime.getTime() - 
checkStartTime.getTime());
+        if (executionMilliseconds > duration && 
LOGGER.isLoggable(Level.SEVERE)) {
+            LOGGER.severe("Periodic job was unable to meet the period of " + 
duration + " milliseconds. Actually took "
+                    + executionMilliseconds + " execution will shutdown" + new 
Date());
+            onTime = false;
+        }
+        return onTime;
+
     }
 
     public static void runChannelJob(JobSpecification channeljobSpec, 
IHyracksClientConnection hcc) throws Exception {
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 43aa161..5528962 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -39,7 +39,6 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
 
 public class ChannelDropStatement implements IExtensionStatement {
 
@@ -109,11 +108,11 @@
             }
 
             listener.getExecutorService().shutdownNow();
-            JobId hyracksJobId = listener.getJobId();
+            long predistributedId = listener.getPredistributedId();
             listener.deActivate();
             activeEventHandler.unregisterListener(listener);
-            if (hyracksJobId != null) {
-                hcc.destroyJob(hyracksJobId);
+            if (predistributedId != -1) {
+                hcc.destroyJob(predistributedId);
             }
 
             //Create a metadata provider to use in nested jobs.
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 5c92cb5..fc160e4 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -41,6 +41,7 @@
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
 import 
org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
@@ -50,8 +51,10 @@
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
 import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.IndexedTypeExpression;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.CreateIndexStatement;
 import org.apache.asterix.lang.common.statement.DatasetDecl;
 import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
 import org.apache.asterix.lang.common.statement.InsertStatement;
@@ -69,11 +72,11 @@
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
 
@@ -196,11 +199,31 @@
                 new Identifier(BADConstants.BAD_DATAVERSE_NAME), 
resultsTypeName, null, null, null, null,
                 new HashMap<String, String>(), new HashMap<String, String>(), 
DatasetType.INTERNAL, idd, true);
 
+        //Create an index on timestamp for results
+        CreateIndexStatement createTimeIndex = new CreateIndexStatement();
+        createTimeIndex.setDatasetName(resultsName);
+        createTimeIndex.setDataverseName(new Identifier(dataverse));
+        createTimeIndex.setIndexName(new Identifier(resultsName + 
"TimeIndex"));
+        createTimeIndex.setIfNotExists(false);
+        createTimeIndex.setIndexType(IndexType.BTREE);
+        createTimeIndex.setEnforced(false);
+        createTimeIndex.setGramLength(0);
+        List<String> fNames = new ArrayList<>();
+        fNames.add(BADConstants.ChannelExecutionTime);
+        Pair<List<String>, IndexedTypeExpression> fields = new Pair<>(fNames, 
null);
+        createTimeIndex.addFieldExprPair(fields);
+        createTimeIndex.addFieldIndexIndicator(0);
+
+
         //Run both statements to create datasets
         ((QueryTranslator) 
statementExecutor).handleCreateDatasetStatement(metadataProvider, 
createSubscriptionsDataset,
                 hcc);
         metadataProvider.getLocks().reset();
         ((QueryTranslator) 
statementExecutor).handleCreateDatasetStatement(metadataProvider, 
createResultsDataset, hcc);
+        metadataProvider.getLocks().reset();
+        
+        //Create a time index for the results
+        ((QueryTranslator) 
statementExecutor).handleCreateIndexStatement(metadataProvider, 
createTimeIndex, hcc);
 
     }
 
@@ -242,13 +265,13 @@
             PrecompiledJobEventListener listener, boolean predistributed) 
throws Exception {
         if (channeljobSpec != null) {
             //TODO: Find a way to fix optimizer tests so we don't need this 
check
-            JobId jobId = null;
+            long destributedId = -1;
             if (predistributed) {
-                jobId = hcc.distributeJob(channeljobSpec);
+                destributedId = hcc.distributeJob(channeljobSpec);
             }
             ScheduledExecutorService ses = 
ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class),
-                    jobId, hcc, ChannelJobService.findPeriod(duration));
-            listener.storeDistributedInfo(jobId, ses, null);
+                    destributedId, hcc, 
ChannelJobService.findPeriod(duration), new HashMap<>());
+            listener.storeDistributedInfo(destributedId, ses, null, null);
         }
 
     }
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index 0666b38..793f3aa 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -20,7 +20,6 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
-import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.logging.Level;
@@ -29,11 +28,9 @@
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.algebra.extension.IExtensionStatement;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
-import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.app.translator.QueryTranslator;
 import org.apache.asterix.bad.BADConstants;
 import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.lang.BADParserFactory;
 import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
 import 
org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
@@ -44,30 +41,35 @@
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.clause.LetClause;
 import org.apache.asterix.lang.common.expression.CallExpr;
 import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.expression.VariableExpr;
 import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.DeleteStatement;
 import org.apache.asterix.lang.common.statement.Query;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.lang.sqlpp.expression.SelectExpression;
 import org.apache.asterix.lang.sqlpp.visitor.SqlppDeleteRewriteVisitor;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.om.base.temporal.ADurationParserFactory;
+import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataset.IHyracksDataset;
 import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
 
@@ -76,24 +78,33 @@
     private static final Logger LOGGER = 
Logger.getLogger(CreateProcedureStatement.class.getName());
 
     private final FunctionSignature signature;
-    private final String functionBody;
+    private final String procedureBody;
+    private final Statement procedureBodyStatement;
     private final List<String> paramList;
+    private final List<VariableExpr> varList;
     private final CallExpr period;
     private String duration = "";
 
-    public String getFunctionBody() {
-        return functionBody;
-    }
-
-    public CreateProcedureStatement(FunctionSignature signature, 
List<VarIdentifier> parameterList, String functionBody,
-            Expression period) {
+    public CreateProcedureStatement(FunctionSignature signature, 
List<VarIdentifier> parameterList,
+            List<Integer> paramIds, String functionBody, Statement 
procedureBodyStatement, Expression period) {
         this.signature = signature;
-        this.functionBody = functionBody;
+        this.procedureBody = functionBody;
+        this.procedureBodyStatement = procedureBodyStatement;
         this.paramList = new ArrayList<>();
-        for (VarIdentifier varId : parameterList) {
-            this.paramList.add(varId.getValue());
+        this.varList = new ArrayList<>();
+        for (int i = 0; i < parameterList.size(); i++) {
+            this.paramList.add(parameterList.get(i).getValue());
+            this.varList.add(new VariableExpr(new 
VarIdentifier(parameterList.get(i).toString(), paramIds.get(i))));
         }
         this.period = (CallExpr) period;
+    }
+
+    public String getProcedureBody() {
+        return procedureBody;
+    }
+
+    public Statement getProcedureBodyStatement() {
+        return procedureBodyStatement;
     }
 
     @Override
@@ -103,6 +114,10 @@
 
     public List<String> getParamList() {
         return paramList;
+    }
+
+    public List<VariableExpr> getVarList() {
+        return varList;
     }
 
     public FunctionSignature getSignature() {
@@ -158,33 +173,46 @@
         return jobSpec;
     }
 
-    private Pair<JobSpecification, PrecompiledType> createProcedureJob(String 
body,
-            IStatementExecutor statementExecutor, MetadataProvider 
metadataProvider, IHyracksClientConnection hcc,
-            IHyracksDataset hdc, Stats stats) throws Exception {
-        StringBuilder builder = new StringBuilder();
-        builder.append(body);
-        builder.append(";");
-        BADParserFactory factory = new BADParserFactory();
-        List<Statement> fStatements = factory.createParser(new 
StringReader(builder.toString())).parse();
-        if (fStatements.size() > 1) {
-            throw new CompilationException("Procedure can only execute a 
single statement");
+    private void addLets(SelectExpression s) {
+        FunctionIdentifier function = BuiltinFunctions.GET_JOB_PARAMETER;
+        FunctionSignature sig =
+                new FunctionSignature(function.getNamespace(), 
function.getName(), function.getArity());
+        for (VariableExpr var : varList) {
+            List<Expression> strListForCall = new ArrayList<>();
+            LiteralExpr l = new LiteralExpr(new 
StringLiteral(var.getVar().getValue()));
+            strListForCall.add(l);
+            Expression con = new CallExpr(sig, strListForCall);
+            LetClause let = new LetClause(var, con);
+            s.getLetList().add(let);
         }
-        if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
+    }
+
+    private Pair<JobSpecification, PrecompiledType> 
createProcedureJob(IStatementExecutor statementExecutor,
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc, 
IHyracksDataset hdc, Stats stats)
+                    throws Exception {
+        if (getProcedureBodyStatement().getKind() == Statement.Kind.INSERT) {
+            if (!varList.isEmpty()) {
+                throw new CompilationException("Insert procedures cannot have 
parameters");
+            }
             return new Pair<>(
                     ((QueryTranslator) 
statementExecutor).handleInsertUpsertStatement(metadataProvider,
-                            fStatements.get(0), hcc, hdc, 
ResultDelivery.ASYNC, null, stats, true, null),
+                            getProcedureBodyStatement(), hcc, hdc, 
ResultDelivery.ASYNC, null, stats, true, null),
                     PrecompiledType.INSERT);
-        } else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
-            Pair<JobSpecification, PrecompiledType> pair =
-                    new Pair<>(compileQueryJob(statementExecutor, 
metadataProvider, hcc, (Query) fStatements.get(0)),
-                            PrecompiledType.QUERY);
+        } else if (getProcedureBodyStatement().getKind() == 
Statement.Kind.QUERY) {
+            Query s = (Query) getProcedureBodyStatement();
+            addLets((SelectExpression) s.getBody());
+            Pair<JobSpecification, PrecompiledType> pair = new Pair<>(
+                    compileQueryJob(statementExecutor, metadataProvider, hcc, 
(Query) getProcedureBodyStatement()),
+                    PrecompiledType.QUERY);
             metadataProvider.getLocks().unlock();
             return pair;
-        } else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
+        } else if (getProcedureBodyStatement().getKind() == 
Statement.Kind.DELETE) {
             SqlppDeleteRewriteVisitor visitor = new 
SqlppDeleteRewriteVisitor();
-            fStatements.get(0).accept(visitor, null);
+            getProcedureBodyStatement().accept(visitor, null);
+            DeleteStatement delete = (DeleteStatement) 
getProcedureBodyStatement();
+            addLets((SelectExpression) delete.getQuery().getBody());
             return new Pair<>(((QueryTranslator) 
statementExecutor).handleDeleteStatement(metadataProvider,
-                    fStatements.get(0), hcc, true), PrecompiledType.DELETE);
+                    getProcedureBodyStatement(), hcc, true), 
PrecompiledType.DELETE);
         } else {
             throw new CompilationException("Procedure can only execute a 
single delete, insert, or query");
         }
@@ -193,8 +221,8 @@
     private void setupDistributedJob(EntityId entityId, JobSpecification 
jobSpec, IHyracksClientConnection hcc,
             PrecompiledJobEventListener listener, ResultSetId resultSetId, 
IHyracksDataset hdc, Stats stats)
             throws Exception {
-        JobId jobId = hcc.distributeJob(jobSpec);
-        listener.storeDistributedInfo(jobId, null, new ResultReader(hdc, 
jobId, resultSetId));
+        long predistributedId = hcc.distributeJob(jobSpec);
+        listener.storeDistributedInfo(predistributedId, null, hdc, 
resultSetId);
     }
 
     @Override
@@ -228,7 +256,7 @@
                 throw new AsterixException("Procedure " + signature.getName() 
+ " is already running");
             }
             procedure = new Procedure(dataverse, signature.getName(), 
signature.getArity(), getParamList(),
-                    Function.RETURNTYPE_VOID, getFunctionBody(), 
Function.LANGUAGE_AQL, duration);
+                    Function.RETURNTYPE_VOID, getProcedureBody(), 
Function.LANGUAGE_AQL, duration);
             MetadataProvider tempMdProvider = new 
MetadataProvider(metadataProvider.getApplicationContext(),
                     metadataProvider.getDefaultDataverse());
             tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
@@ -246,16 +274,18 @@
 
             //Create Procedure Internal Job
             Pair<JobSpecification, PrecompiledType> procedureJobSpec =
-                    createProcedureJob(getFunctionBody(), statementExecutor, 
tempMdProvider, hcc, hdc, stats);
+                    createProcedureJob(statementExecutor, tempMdProvider, hcc, 
hdc, stats);
 
             // Now we subscribe
             if (listener == null) {
                 //TODO: Add datasets used by channel function
-                listener = new PrecompiledJobEventListener(appCtx, entityId, 
procedureJobSpec.second, new ArrayList<>(),
+                listener = new PrecompiledJobEventListener(appCtx, entityId, 
procedureJobSpec.second,
+                        new ArrayList<>(),
                         null, "BadListener");
                 activeEventHandler.registerListener(listener);
             }
-            setupDistributedJob(entityId, procedureJobSpec.first, hcc, 
listener, tempMdProvider.getResultSetId(), hdc,
+            setupDistributedJob(entityId, procedureJobSpec.first, hcc, 
listener, tempMdProvider.getResultSetId(),
+                    hdc,
                     stats);
 
             MetadataManager.INSTANCE.addEntity(mdTxnCtx, procedure);
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
index 0dbd0a3..9bf8583 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -18,7 +18,11 @@
  */
 package org.apache.asterix.bad.lang.statement;
 
+import java.io.DataOutput;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.asterix.active.EntityId;
@@ -34,12 +38,20 @@
 import 
org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
 import org.apache.asterix.bad.metadata.Procedure;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.lang.common.base.Expression;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
 import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.base.IAObject;
+import org.apache.asterix.translator.ConstantHelper;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -48,17 +60,20 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public class ExecuteProcedureStatement implements IExtensionStatement {
 
     private final String dataverseName;
     private final String procedureName;
     private final int arity;
+    private final List<Expression> argList;
 
-    public ExecuteProcedureStatement(String dataverseName, String 
procedureName, int arity) {
+    public ExecuteProcedureStatement(String dataverseName, String 
procedureName, int arity, List<Expression> argList) {
         this.dataverseName = dataverseName;
         this.procedureName = procedureName;
         this.arity = arity;
+        this.argList = argList;
     }
 
     public String getDataverseName() {
@@ -109,22 +124,27 @@
             if (procedure == null) {
                 throw new AlgebricksException("There is no procedure with this 
name " + procedureName + ".");
             }
-
-            JobId hyracksJobId = listener.getJobId();
+            Map<byte[], byte[]> contextRuntimeVarMap = 
createContextRuntimeMap(procedure);
+            long predistributedId = listener.getPredistributedId();
             if (procedure.getDuration().equals("")) {
-                hcc.startJob(hyracksJobId);
+                JobId jobId = hcc.startJob(predistributedId, 
contextRuntimeVarMap);
 
                 if (listener.getType() == PrecompiledType.QUERY) {
-                    hcc.waitForCompletion(hyracksJobId);
-                    ResultReader resultReader = listener.getResultReader();
+                    hcc.waitForCompletion(jobId);
+                    //ResultReader resultReader = new ResultReader(hdc, jobId, 
metadataProvider.getResultSetId());
+                    ResultReader resultReader =
+                            new ResultReader(listener.getResultDataset(), 
jobId, listener.getResultId());
+
                     ResultUtil.printResults(appCtx, resultReader,
                             ((QueryTranslator) 
statementExecutor).getSessionOutput(), new Stats(), null);
                 }
 
             } else {
-                ScheduledExecutorService ses = 
ChannelJobService.startJob(null, EnumSet.noneOf(JobFlag.class),
-                        hyracksJobId, hcc, 
ChannelJobService.findPeriod(procedure.getDuration()));
-                listener.storeDistributedInfo(hyracksJobId, ses, 
listener.getResultReader());
+                ScheduledExecutorService ses =
+                        ChannelJobService.startJob(null, 
EnumSet.noneOf(JobFlag.class), predistributedId, hcc,
+                                
ChannelJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap);
+                listener.storeDistributedInfo(predistributedId, ses, 
listener.getResultDataset(),
+                        listener.getResultId());
             }
 
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -140,4 +160,43 @@
         }
     }
 
+    private Map<byte[], byte[]> createContextRuntimeMap(Procedure procedure)
+            throws AsterixException, HyracksDataException {
+        Map<byte[], byte[]> map = new HashMap<>();
+        if (procedure.getParams().size() != argList.size()) {
+            throw 
AsterixException.create(ErrorCode.COMPILATION_INVALID_PARAMETER_NUMBER,
+                    procedure.getEntityId().getEntityName(), argList.size());
+        }
+        ArrayBackedValueStorage abvsKey = new ArrayBackedValueStorage();
+        DataOutput dosKey = abvsKey.getDataOutput();
+        ArrayBackedValueStorage abvsValue = new ArrayBackedValueStorage();
+        DataOutput dosValue = abvsValue.getDataOutput();
+
+        for (int i = 0; i < procedure.getParams().size(); i++) {
+            if (!(argList.get(i) instanceof LiteralExpr)) {
+                //TODO handle nonliteral arguments to procedure
+                throw AsterixException.create(ErrorCode.TYPE_UNSUPPORTED, 
procedure.getEntityId().getEntityName(),
+                        argList.get(i).getClass());
+            }
+            //Turn the argument name into a byte array
+            IAObject str = new AString(procedure.getParams().get(i));
+            abvsKey.reset();
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(str.getType()).serialize(str,
 dosKey);
+            //We do not save the type tag of the string key
+            byte[] key = new byte[abvsKey.getLength() - 1];
+            System.arraycopy(abvsKey.getByteArray(), 1, key, 0, 
abvsKey.getLength() - 1);
+
+            //Turn the argument value into a byte array
+            IAObject object = ConstantHelper.objectFromLiteral(((LiteralExpr) 
argList.get(i)).getValue());
+            abvsValue.reset();
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(object.getType()).serialize(object,
+                    dosValue);
+            byte[] value = new byte[abvsValue.getLength()];
+            System.arraycopy(abvsValue.getByteArray(), 
abvsValue.getStartOffset(), value, 0, abvsValue.getLength());
+
+            map.put(key, value);
+        }
+        return map;
+    }
+
 }
\ No newline at end of file
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
index 3c618ae..16042eb 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -39,7 +39,6 @@
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
 
 public class ProcedureDropStatement implements IExtensionStatement {
 
@@ -109,11 +108,11 @@
             if (listener.getExecutorService() != null) {
                 listener.getExecutorService().shutdownNow();
             }
-            JobId hyracksJobId = listener.getJobId();
+            long predistributedId = listener.getPredistributedId();
             listener.deActivate();
             activeEventHandler.unregisterListener(listener);
-            if (hyracksJobId != null) {
-                hcc.destroyJob(hyracksJobId);
+            if (predistributedId != -1) {
+                hcc.destroyJob(predistributedId);
             }
 
             //Remove the Channel Metadata
diff --git 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
index 5036549..c36e60e 100644
--- 
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
+++ 
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
@@ -30,11 +30,12 @@
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.metadata.IDataset;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.log4j.Level;
@@ -43,6 +44,7 @@
 public class PrecompiledJobEventListener implements 
IActiveEntityEventsListener {
 
     private static final Logger LOGGER = 
Logger.getLogger(PrecompiledJobEventListener.class);
+
 
     public enum PrecompiledType {
         CHANNEL,
@@ -57,9 +59,13 @@
         FINISHED
     }
 
+    private long predistributedId;
     private ScheduledExecutorService executorService = null;
     private ResultReader resultReader;
     private final PrecompiledType type;
+
+    private IHyracksDataset hdc;
+    private ResultSetId resultSetId;
     // members
     protected volatile ActivityState state;
     protected JobId jobId;
@@ -92,8 +98,21 @@
         this.type = type;
     }
 
+
+    public IHyracksDataset getResultDataset() {
+        return hdc;
+    }
+
+    public ResultSetId getResultId() {
+        return resultSetId;
+    }
+
+    public long getPredistributedId() {
+        return predistributedId;
+    }
+
     protected synchronized void handle(ActivePartitionMessage message) {
-        if (message.getEvent() == Event.RUNTIME_REGISTERED) {
+        if (message.getEvent() == 
ActivePartitionMessage.Event.RUNTIME_REGISTERED) {
             numRegistered++;
             if (numRegistered == locations.getLocations().length) {
                 state = ActivityState.RUNNING;
@@ -172,10 +191,12 @@
         return type;
     }
 
-    public void storeDistributedInfo(JobId jobId, ScheduledExecutorService 
ses, ResultReader resultReader) {
-        this.jobId = jobId;
+    public void storeDistributedInfo(long predistributedId, 
ScheduledExecutorService ses,
+            IHyracksDataset hdc, ResultSetId resultSetId) {
+        this.predistributedId = predistributedId;
         this.executorService = ses;
-        this.resultReader = resultReader;
+        this.hdc = hdc;
+        this.resultSetId = resultSetId;
     }
 
     public ScheduledExecutorService getExecutorService() {
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt 
b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 6a06ea6..c6f7779 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -124,6 +124,7 @@
   FunctionName fctName = null;
   FunctionSignature signature;
   List<VarIdentifier> paramList = new ArrayList<VarIdentifier>();
+  List<Integer> paramIds = new ArrayList<Integer>();
   String functionBody;
   Token beginPos;
   Token endPos;
@@ -135,7 +136,13 @@
      paramList = ParameterList()
     <LEFTBRACE>
   {
-     beginPos = token;
+    for (VarIdentifier param : paramList)
+    {
+      VarIdentifier v = new VarIdentifier(param.toString());
+      getCurrentScope().addNewVarSymbolToScope(v);
+      paramIds.add(v.getId());
+    }
+    beginPos = token;
   }
   functionBodyExpr = SingleStatement() <RIGHTBRACE>
     {
@@ -146,7 +153,7 @@
     }
   ("period" period = FunctionCallExpr())?
   {
-  return new CreateProcedureStatement(signature, paramList, functionBody, 
period);
+  return new CreateProcedureStatement(signature, paramList, paramIds, 
functionBody, functionBodyExpr, period);
   }
 }
 
@@ -176,7 +183,7 @@
   )*)? <RIGHTPAREN>
     {
       String fqFunctionName =  funcName.function;
-      return new ExecuteProcedureStatement(funcName.dataverse, fqFunctionName, 
arity);
+      return new ExecuteProcedureStatement(funcName.dataverse, fqFunctionName, 
arity, argList);
     }
 }
 
diff --git 
a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
 
b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
index 65e7dbc..a352739 100644
--- 
a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
+++ 
b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-create.plan
@@ -16,42 +16,46 @@
                       -- COMMIT  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- INSERT_DELETE  |PARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$32]  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
+                            -- INDEX_INSERT_DELETE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- INSERT_DELETE  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$32]  
|PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
                                               -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                  -- NESTED_LOOP  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  
|PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
                                                       -- STREAM_PROJECT  
|PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE 
 |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN 
[$$44, $$42][$$38, $$39]  |PARTITIONED|
-                                                            -- 
HASH_PARTITION_EXCHANGE [$$44, $$42]  |PARTITIONED|
-                                                              -- 
STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  
|PARTITIONED|
-                                                                  -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- 
DATASOURCE_SCAN  |PARTITIONED|
-                                                                      -- 
BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                        -- 
ASSIGN  |UNPARTITIONED|
-                                                                          -- 
EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                            -- 
HASH_PARTITION_EXCHANGE [$$38, $$39]  |PARTITIONED|
-                                                              -- 
STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  
|PARTITIONED|
-                                                                  -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- 
DATASOURCE_SCAN  |PARTITIONED|
-                                                                      -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                    -- BROADCAST_EXCHANGE  
|PARTITIONED|
-                                                      -- STREAM_PROJECT  
|PARTITIONED|
-                                                        -- ASSIGN  
|PARTITIONED|
-                                                          -- STREAM_PROJECT  
|PARTITIONED|
+                                                          -- NESTED_LOOP  
|PARTITIONED|
                                                             -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                              -- 
STREAM_PROJECT  |PARTITIONED|
                                                                 -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
+                                                                  -- 
HYBRID_HASH_JOIN [$$44, $$42][$$38, $$39]  |PARTITIONED|
+                                                                    -- 
HASH_PARTITION_EXCHANGE [$$44, $$42]  |PARTITIONED|
+                                                                      -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                        -- 
ASSIGN  |PARTITIONED|
+                                                                          -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                                              
-- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                               
 -- ASSIGN  |UNPARTITIONED|
+                                                                               
   -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                    -- 
HASH_PARTITION_EXCHANGE [$$38, $$39]  |PARTITIONED|
+                                                                      -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                        -- 
ASSIGN  |PARTITIONED|
+                                                                          -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                                              
-- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
 -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                            -- 
BROADCAST_EXCHANGE  |PARTITIONED|
+                                                              -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                -- ASSIGN  
|PARTITIONED|
+                                                                  -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                    -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                                        -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
 
b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
index 06630e6..7ad21f6 100644
--- 
a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
+++ 
b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-subscribe.plan
@@ -16,45 +16,49 @@
                       -- COMMIT  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- INSERT_DELETE  |PARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$32]  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
+                            -- INDEX_INSERT_DELETE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- INSERT_DELETE  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$32]  
|PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
                                               -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                  -- NESTED_LOOP  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  
|PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
                                                       -- STREAM_PROJECT  
|PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE 
 |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN 
[$$44, $$42][$$38, $$39]  |PARTITIONED|
-                                                            -- 
HASH_PARTITION_EXCHANGE [$$44, $$42]  |PARTITIONED|
-                                                              -- 
STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  
|PARTITIONED|
-                                                                  -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- 
DATASOURCE_SCAN  |PARTITIONED|
-                                                                      -- 
BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                        -- 
ASSIGN  |UNPARTITIONED|
-                                                                          -- 
EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                            -- 
HASH_PARTITION_EXCHANGE [$$38, $$39]  |PARTITIONED|
-                                                              -- 
STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  
|PARTITIONED|
-                                                                  -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- 
DATASOURCE_SCAN  |PARTITIONED|
-                                                                      -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                    -- BROADCAST_EXCHANGE  
|PARTITIONED|
-                                                      -- STREAM_PROJECT  
|PARTITIONED|
-                                                        -- ASSIGN  
|PARTITIONED|
-                                                          -- STREAM_PROJECT  
|PARTITIONED|
+                                                          -- NESTED_LOOP  
|PARTITIONED|
                                                             -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                              -- 
STREAM_PROJECT  |PARTITIONED|
                                                                 -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                  -- 
HYBRID_HASH_JOIN [$$44, $$42][$$38, $$39]  |PARTITIONED|
+                                                                    -- 
HASH_PARTITION_EXCHANGE [$$44, $$42]  |PARTITIONED|
+                                                                      -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                        -- 
ASSIGN  |PARTITIONED|
+                                                                          -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                                              
-- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                               
 -- ASSIGN  |UNPARTITIONED|
+                                                                               
   -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                    -- 
HASH_PARTITION_EXCHANGE [$$38, $$39]  |PARTITIONED|
+                                                                      -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                        -- 
ASSIGN  |PARTITIONED|
+                                                                          -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                                              
-- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
 -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                            -- 
BROADCAST_EXCHANGE  |PARTITIONED|
+                                                              -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                -- ASSIGN  
|PARTITIONED|
+                                                                  -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                    -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                                        -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
diff --git 
a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
 
b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
index b3f4c51..3338e79 100644
--- 
a/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
+++ 
b/asterix-bad/src/test/resources/optimizerts/results/channel/channel-unsubscribe.plan
@@ -16,45 +16,49 @@
                       -- COMMIT  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- INSERT_DELETE  |PARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$32]  |PARTITIONED|
-                                -- ASSIGN  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
-                                    -- ASSIGN  |PARTITIONED|
-                                      -- STREAM_PROJECT  |PARTITIONED|
+                            -- INDEX_INSERT_DELETE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                    -- INSERT_DELETE  |PARTITIONED|
+                                      -- HASH_PARTITION_EXCHANGE [$$32]  
|PARTITIONED|
                                         -- ASSIGN  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
                                             -- ASSIGN  |PARTITIONED|
                                               -- STREAM_PROJECT  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
-                                                  -- NESTED_LOOP  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  
|PARTITIONED|
+                                                -- ASSIGN  |PARTITIONED|
+                                                  -- STREAM_PROJECT  
|PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
                                                       -- STREAM_PROJECT  
|PARTITIONED|
                                                         -- ONE_TO_ONE_EXCHANGE 
 |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN 
[$$44, $$42][$$38, $$39]  |PARTITIONED|
-                                                            -- 
HASH_PARTITION_EXCHANGE [$$44, $$42]  |PARTITIONED|
-                                                              -- 
STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  
|PARTITIONED|
-                                                                  -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- 
DATASOURCE_SCAN  |PARTITIONED|
-                                                                      -- 
BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                        -- 
ASSIGN  |UNPARTITIONED|
-                                                                          -- 
EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                            -- 
HASH_PARTITION_EXCHANGE [$$38, $$39]  |PARTITIONED|
-                                                              -- 
STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  
|PARTITIONED|
-                                                                  -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- 
DATASOURCE_SCAN  |PARTITIONED|
-                                                                      -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                    -- BROADCAST_EXCHANGE  
|PARTITIONED|
-                                                      -- STREAM_PROJECT  
|PARTITIONED|
-                                                        -- ASSIGN  
|PARTITIONED|
-                                                          -- STREAM_PROJECT  
|PARTITIONED|
+                                                          -- NESTED_LOOP  
|PARTITIONED|
                                                             -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                              -- 
STREAM_PROJECT  |PARTITIONED|
                                                                 -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                  -- 
HYBRID_HASH_JOIN [$$44, $$42][$$38, $$39]  |PARTITIONED|
+                                                                    -- 
HASH_PARTITION_EXCHANGE [$$44, $$42]  |PARTITIONED|
+                                                                      -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                        -- 
ASSIGN  |PARTITIONED|
+                                                                          -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                                              
-- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                               
 -- ASSIGN  |UNPARTITIONED|
+                                                                               
   -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                                    -- 
HASH_PARTITION_EXCHANGE [$$38, $$39]  |PARTITIONED|
+                                                                      -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                        -- 
ASSIGN  |PARTITIONED|
+                                                                          -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                                              
-- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                               
 -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                            -- 
BROADCAST_EXCHANGE  |PARTITIONED|
+                                                              -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                -- ASSIGN  
|PARTITIONED|
+                                                                  -- 
STREAM_PROJECT  |PARTITIONED|
+                                                                    -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- 
DATASOURCE_SCAN  |PARTITIONED|
+                                                                        -- 
ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                          -- 
EMPTY_TUPLE_SOURCE  |PARTITIONED|
 -- COMMIT  |PARTITIONED|
   -- STREAM_PROJECT  |PARTITIONED|
     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.1.ddl.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.1.ddl.sqlpp
new file mode 100644
index 0000000..6f6b764
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.1.ddl.sqlpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Disasters with friends
+* Expected Res : Success
+* Date         : May 17
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+
+create type UserLocation as {
+  recordId: uuid,
+  latitude: double,
+  longitude: double,
+  user_id: int,
+  timestamp: datetime
+};
+
+create type EmergencyShelter as {
+  name: string,
+  location: point
+};
+
+create type EmergencyReport as {
+  reportId: uuid,
+  impactZone: circle,
+  timestamp: datetime,
+  emergencyType: string
+};
+
+//create datasets
+create dataset EmergencyReports(EmergencyReport) primary key reportId 
autogenerated;
+create dataset UserLocations(UserLocation) primary key recordId autogenerated;
+create dataset EmergencyShelters(EmergencyShelter) primary key name;
+
+create broker brokerA at "http://www.notifyA.com";;
+
+create function EmergenciesNearMe(emergencyType, uid){
+  (with tenMinutesAgo as current_datetime() - day_time_duration("PT10S")
+  select report as report, shelters as shelters
+  from EmergencyReports report, UserLocations user
+  let shelters = (select * from EmergencyShelters shelter
+    where spatial_intersect(report.impactZone,shelter.location))
+  where user.user_id = uid 
+    and report.timestamp >= tenMinutesAgo
+    and user.timestamp >= tenMinutesAgo
+    and report.emergencyType = emergencyType
+    and 
spatial_intersect(report.impactZone,create_point(user.latitude,user.longitude)))
+};
+
+create repetitive channel EmergenciesNearMeChannel using EmergenciesNearMe@2 
period duration("PT5S");
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.2.update.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.2.update.sqlpp
new file mode 100644
index 0000000..b1a132b
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.2.update.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Disasters with friends
+* Expected Res : Success
+* Date         : May 17
+* Author       : Steven Jacobs
+*/
+use channels;
+
+subscribe to EmergenciesNearMeChannel("tornado", 1) on brokerA;
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.3.update.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.3.update.sqlpp
new file mode 100644
index 0000000..292151e
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.3.update.sqlpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Disasters with friends
+* Expected Res : Success
+* Date         : May 17
+* Author       : Steven Jacobs
+*/
+
+use channels;
+upsert into UserLocations([
+{"user_id":1, "latitude":5, "longitude":5, "timestamp":current_datetime()},
+{"user_id":2, "latitude":10, "longitude":10, "timestamp":current_datetime()},
+{"user_id":3, "latitude":15, "longitude":15, "timestamp":current_datetime()}]
+);
+upsert into EmergencyShelters([
+{"name":"A", "location":create_point(5.0,5.0)},
+{"name":"B", "location":create_point(10.0,10.0)},
+{"name":"C", "location":create_point(15.0,15.0)}]
+);
+upsert into EmergencyReports([
+{"emergencyType":"tornado", "impactZone":create_circle(create_point(5.0,6.0), 
10.0), "timestamp":current_datetime()},
+{"emergencyType":"flood", "impactZone":create_circle(create_point(5.0,6.0), 
5.0), "timestamp":current_datetime()},
+{"emergencyType":"tornado", 
"impactZone":create_circle(create_point(30.0,70.0), 5.0), 
"timestamp":current_datetime()}]
+);
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.4.sleep.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.4.sleep.sqlpp
new file mode 100644
index 0000000..67c5efa
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.4.sleep.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Disasters with friends
+* Expected Res : Success
+* Date         : May 17
+* Author       : Steven Jacobs
+*/
+
+5000
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.5.query.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.5.query.sqlpp
new file mode 100644
index 0000000..2ed5d46
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/disasters_with_friends/disasters_with_friends.5.query.sqlpp
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Disasters with friends
+* Expected Res : Success
+* Date         : May 17
+* Author       : Steven Jacobs
+*/
+
+use channels;
+select result.result.shelters
+from EmergenciesNearMeChannelResults result
+order by name;
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
index 42fd4aa..11b7b33 100644
--- 
a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.1.ddl.sqlpp
@@ -35,12 +35,12 @@
 create dataset UserLocations(userLocation)
 primary key userId;
 
-create function RoomOccupants($room) {
-    for $location in dataset UserLocations
-    where $location.roomNumber = $room
-    return $location.userId
+create function RoomOccupants(room) {
+  (select location.userId
+  from UserLocations location
+  where location.roomNumber = room)
 };
 
 create broker brokerA at "http://www.notifyA.com";;
 
-create repetitive channel roomRecords using RoomOccupants@1 period 
duration("PT1S");
+create repetitive channel roomRecords using RoomOccupants@1 period 
duration("PT30S");
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
index 270326b..d5f4290 100644
--- 
a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.4.sleep.sqlpp
@@ -22,4 +22,4 @@
 * Date         : Sep 2016
 * Author       : Steven Jacobs
 */
-600000
+630000
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
index 8db3fde..cfe92c9 100644
--- 
a/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/channel/ten_minute_channel/ten_minute_channel.5.query.sqlpp
@@ -25,6 +25,4 @@
 
 use channels;
 
-count (from $result in dataset roomRecordsResults
-order by $result.result
-select $result.result) > 599;
+(select value count(result) from roomRecordsResults)[0] > 19;
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.ddl.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.ddl.sqlpp
new file mode 100644
index 0000000..42b0d40
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.ddl.sqlpp
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Delete Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+create type myLocation as {
+  id: int
+};
+create dataset UserLocations(myLocation)
+primary key id;
+insert into UserLocations(
+  [{"id":0, "roomNumber":4815162342},
+  {"id":1, "roomNumber":"lost"},
+  {"id":2, "roomNumber":108},
+  {"id":3, "roomNumber":"jacob"}]
+);
+create procedure deleteSome(r, otherRoom) {
+delete from UserLocations
+where roomNumber = r
+or roomNumber = otherRoom
+};
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.query.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.query.sqlpp
new file mode 100644
index 0000000..88166bb
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Delete Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+use channels;
+select value count(roomNumber) from UserLocations;
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp
new file mode 100644
index 0000000..8d03794
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.3.update.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Delete Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+use channels;
+execute deleteSome(108,"jacob");
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.4.query.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.4.query.sqlpp
new file mode 100644
index 0000000..88166bb
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.4.query.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Delete Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+use channels;
+select value count(roomNumber) from UserLocations;
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.ddl.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.ddl.sqlpp
new file mode 100644
index 0000000..427a423
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.ddl.sqlpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Query Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use channels;
+create type myLocation as {
+  id: int
+};
+create dataset UserLocations(myLocation)
+primary key id;
+insert into UserLocations(
+  [{"id":0, "roomNumber":4815162342},
+  {"id":1, "roomNumber":"lost"},
+  {"id":2, "roomNumber":108},
+  {"id":3, "roomNumber":"jacob"}]
+);
+create procedure selectSome(r, otherRoom) {
+select roomNumber from UserLocations
+where roomNumber = r
+or roomNumber = otherRoom
+order by id;
+};
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.update.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.update.sqlpp
new file mode 100644
index 0000000..aa0722a
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.update.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Query Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+use channels;
+execute selectSome(108,"jacob");
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.3.update.sqlpp
 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.3.update.sqlpp
new file mode 100644
index 0000000..aa0722a
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure_with_parameters/query_procedure_with_parameters.3.update.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+* Description  : Simple Query Procedure with parameters
+* Expected Res : Success
+* Date         : May 2017
+* Author       : Steven Jacobs
+*/
+
+use channels;
+execute selectSome(108,"jacob");
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/results/channel/disasters_with_friends/disasters_with_friends.1.adm
 
b/asterix-bad/src/test/resources/runtimets/results/channel/disasters_with_friends/disasters_with_friends.1.adm
new file mode 100644
index 0000000..aa62b87
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/results/channel/disasters_with_friends/disasters_with_friends.1.adm
@@ -0,0 +1 @@
+{ "shelters": [ { "shelter": { "name": "A", "location": point("5.0,5.0") } }, 
{ "shelter": { "name": "B", "location": point("10.0,10.0") } } ] }
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.adm
 
b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.adm
new file mode 100644
index 0000000..bf0d87a
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.1.adm
@@ -0,0 +1 @@
+4
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.adm
 
b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.adm
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure_with_parameters/delete_procedure_with_parameters.2.adm
@@ -0,0 +1 @@
+2
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.adm
 
b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.adm
new file mode 100644
index 0000000..bee5525
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.1.adm
@@ -0,0 +1,2 @@
+{ "roomNumber": 108 }
+{ "roomNumber": "jacob" }
\ No newline at end of file
diff --git 
a/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.adm
 
b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.adm
new file mode 100644
index 0000000..bee5525
--- /dev/null
+++ 
b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure_with_parameters/query_procedure_with_parameters.2.adm
@@ -0,0 +1,2 @@
+{ "roomNumber": 108 }
+{ "roomNumber": "jacob" }
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/testsuite.xml 
b/asterix-bad/src/test/resources/runtimets/testsuite.xml
index 12d7d55..8ca1171 100644
--- a/asterix-bad/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-bad/src/test/resources/runtimets/testsuite.xml
@@ -37,8 +37,18 @@
         </compilation-unit>
     </test-case>
     <test-case FilePath="procedure">
+        <compilation-unit name="delete_procedure_with_parameters">
+            <output-dir 
compare="Text">delete_procedure_with_parameters</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="procedure">
         <compilation-unit name="query_procedure">
             <output-dir compare="Text">query_procedure</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="procedure">
+        <compilation-unit name="query_procedure_with_parameters">
+            <output-dir 
compare="Text">query_procedure_with_parameters</output-dir>
         </compilation-unit>
     </test-case>
     <test-case FilePath="procedure">
@@ -71,10 +81,15 @@
             <output-dir 
compare="Text">subscribe_channel_check_subscriptions</output-dir>
         </compilation-unit>
     </test-case>
-    <!-- <test-case FilePath="channel">
+    <test-case FilePath="channel">
+        <compilation-unit name="disasters_with_friends">
+            <output-dir compare="Text">disasters_with_friends</output-dir>
+        </compilation-unit>
+    </test-case>
+    <test-case FilePath="channel">
         <compilation-unit name="ten_minute_channel">
             <output-dir compare="Text">ten_minute_channel</output-dir>
         </compilation-unit>
-    </test-case> -->
+    </test-case>
   </test-group>
 </test-suite>

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2046
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I999879b1cae0de179a1d3c232fa940228979f4fe
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu>

Reply via email to