Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 0657aeeef -> 8f3cd25e3


PHOENIX-2198 Support correlate variable


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8f3cd25e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8f3cd25e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8f3cd25e

Branch: refs/heads/4.x-HBase-0.98
Commit: 8f3cd25e370c133311a9042e2ba0e2f37c713000
Parents: 0657aee
Author: maryannxue <wei....@intel.com>
Authored: Wed Sep 9 17:16:50 2015 -0400
Committer: maryannxue <wei....@intel.com>
Committed: Wed Sep 9 17:16:50 2015 -0400

----------------------------------------------------------------------
 .../apache/phoenix/execute/AggregatePlan.java   |   9 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |  22 +-
 .../apache/phoenix/execute/CorrelatePlan.java   | 208 +++++++++++++
 .../phoenix/execute/DegenerateQueryPlan.java    |   2 +-
 .../apache/phoenix/execute/HashJoinPlan.java    |   9 +-
 .../execute/LiteralResultIterationPlan.java     |  11 +-
 .../apache/phoenix/execute/RuntimeContext.java  |  33 ++
 .../phoenix/execute/RuntimeContextImpl.java     |  86 ++++++
 .../org/apache/phoenix/execute/ScanPlan.java    |   9 +-
 .../phoenix/execute/SortMergeJoinPlan.java      |   5 +-
 .../apache/phoenix/execute/UnnestArrayPlan.java |   8 +-
 .../CorrelateVariableFieldAccessExpression.java |  75 +++++
 .../phoenix/expression/ExpressionType.java      |  11 +-
 .../phoenix/expression/ExpressionType.java.orig | 298 -------------------
 .../visitor/CloneExpressionVisitor.java         |   6 +
 .../expression/visitor/ExpressionVisitor.java   |   2 +
 .../StatelessTraverseAllExpressionVisitor.java  |   7 +
 .../StatelessTraverseNoExpressionVisitor.java   |   7 +
 .../UngroupedAggregatingResultIterator.java     |   3 +-
 .../phoenix/execute/CorrelatePlanTest.java      | 248 +++++++++++++++
 .../phoenix/execute/UnnestArrayPlanTest.java    |   6 +-
 21 files changed, 744 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 67222d3..9a415b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -77,7 +77,14 @@ public class AggregatePlan extends BaseQueryPlan {
             StatementContext context, FilterableStatement statement, TableRef 
table, RowProjector projector,
             Integer limit, OrderBy orderBy, ParallelIteratorFactory 
parallelIteratorFactory, GroupBy groupBy,
             Expression having) {
-        super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit, orderBy, groupBy, 
parallelIteratorFactory);
+        this(context, statement, table, projector, limit, orderBy, 
parallelIteratorFactory, groupBy, having, null);
+    }
+    
+    private AggregatePlan(
+            StatementContext context, FilterableStatement statement, TableRef 
table, RowProjector projector,
+            Integer limit, OrderBy orderBy, ParallelIteratorFactory 
parallelIteratorFactory, GroupBy groupBy,
+            Expression having, Expression dynamicFilter) {
+        super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit, orderBy, groupBy, 
parallelIteratorFactory, dynamicFilter);
         this.having = having;
         this.aggregators = context.getAggregationManager().getAggregators();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index f14f574..5e58cbd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -40,7 +40,9 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
@@ -93,12 +95,19 @@ public abstract class BaseQueryPlan implements QueryPlan {
     protected final Integer limit;
     protected final OrderBy orderBy;
     protected final GroupBy groupBy;
-    protected final ParallelIteratorFactory parallelIteratorFactory;
+    protected final ParallelIteratorFactory parallelIteratorFactory;    
+    /*
+     * The filter expression that contains 
CorrelateVariableFieldAccessExpression
+     * and will have impact on the ScanRanges. It will recompiled at runtime 
+     * immediately before creating the ResultIterator.
+     */
+    protected final Expression dynamicFilter;
 
     protected BaseQueryPlan(
             StatementContext context, FilterableStatement statement, TableRef 
table,
             RowProjector projection, ParameterMetaData paramMetaData, Integer 
limit, OrderBy orderBy,
-            GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory) {
+            GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory,
+            Expression dynamicFilter) {
         this.context = context;
         this.statement = statement;
         this.tableRef = table;
@@ -108,6 +117,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         this.orderBy = orderBy;
         this.groupBy = groupBy;
         this.parallelIteratorFactory = parallelIteratorFactory;
+        this.dynamicFilter = dynamicFilter;
     }
 
     @Override
@@ -141,6 +151,10 @@ public abstract class BaseQueryPlan implements QueryPlan {
     public RowProjector getProjector() {
         return projection;
     }
+    
+    public Expression getDynamicFilter() {
+        return dynamicFilter;
+    }
 
 //    /**
 //     * Sets up an id used to do round robin queue processing on the server
@@ -175,6 +189,10 @@ public abstract class BaseQueryPlan implements QueryPlan {
         Scan scan = context.getScan();
         PTable table = context.getCurrentTable().getTable();
         
+        if (dynamicFilter != null) {
+            WhereCompiler.compile(context, statement, null, 
Collections.singletonList(dynamicFilter), false, null);            
+        }
+        
         if (OrderBy.REV_ROW_KEY_ORDER_BY.equals(orderBy)) {
             ScanUtil.setReversed(scan);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
new file mode 100644
index 0000000..1b0af8c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Lists;
+
+public class CorrelatePlan extends DelegateQueryPlan {    
+    private final QueryPlan rhs;
+    private final String variableId;
+    private final JoinType joinType;
+    private final boolean isSingleValueOnly;
+    private final RuntimeContext runtimeContext;
+    private final KeyValueSchema joinedSchema;
+    private final KeyValueSchema lhsSchema;
+    private final KeyValueSchema rhsSchema;
+    private final int rhsFieldPosition;
+
+    public CorrelatePlan(QueryPlan lhs, QueryPlan rhs, String variableId, 
+            JoinType joinType, boolean isSingleValueOnly, 
+            RuntimeContext runtimeContext, PTable joinedTable, 
+            PTable lhsTable, PTable rhsTable, int rhsFieldPosition) {
+        super(lhs);
+        if (joinType != JoinType.Inner && joinType != JoinType.Left && 
joinType != JoinType.Semi && joinType != JoinType.Anti)
+            throw new IllegalArgumentException("Unsupported join type '" + 
joinType + "' by CorrelatePlan");
+        
+        this.rhs = rhs;
+        this.variableId = variableId;
+        this.joinType = joinType;
+        this.isSingleValueOnly = isSingleValueOnly;
+        this.runtimeContext = runtimeContext;
+        this.joinedSchema = buildSchema(joinedTable);
+        this.lhsSchema = buildSchema(lhsTable);
+        this.rhsSchema = buildSchema(rhsTable);
+        this.rhsFieldPosition = rhsFieldPosition;
+    }
+
+    private static KeyValueSchema buildSchema(PTable table) {
+        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+        if (table != null) {
+            for (PColumn column : table.getColumns()) {
+                if (!SchemaUtil.isPKColumn(column)) {
+                    builder.addField(column);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    @Override
+    public ExplainPlan getExplainPlan() throws SQLException {
+        List<String> steps = Lists.newArrayList();
+        steps.add("NESTED-LOOP-JOIN (" + joinType.toString().toUpperCase() + 
") TABLES");
+        for (String step : delegate.getExplainPlan().getPlanSteps()) {
+            steps.add("    " + step);            
+        }
+        steps.add("AND" + (rhsSchema.getFieldCount() == 0 ? " (SKIP MERGE)" : 
""));
+        for (String step : rhs.getExplainPlan().getPlanSteps()) {
+            steps.add("    " + step);            
+        }
+        return new ExplainPlan(steps);
+    }
+
+    @Override
+    public ResultIterator iterator() throws SQLException {
+        return iterator(DefaultParallelScanGrouper.getInstance());
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper)
+            throws SQLException {
+        return new ResultIterator() {
+            private final ValueBitSet destBitSet = 
ValueBitSet.newInstance(joinedSchema);
+            private final ValueBitSet lhsBitSet = 
ValueBitSet.newInstance(lhsSchema);
+            private final ValueBitSet rhsBitSet = 
+                    (joinType == JoinType.Semi || joinType == JoinType.Anti) ?
+                            ValueBitSet.EMPTY_VALUE_BITSET 
+                          : ValueBitSet.newInstance(rhsSchema);
+            private final ResultIterator iter = delegate.iterator();
+            private ResultIterator rhsIter = null;
+            private Tuple current = null;
+            private boolean closed = false;
+
+            @Override
+            public void close() throws SQLException {
+                if (!closed) {
+                    closed = true;
+                    iter.close();
+                    if (rhsIter != null) {
+                        rhsIter.close();
+                    }
+                }
+            }
+
+            @Override
+            public Tuple next() throws SQLException {
+                if (closed)
+                    return null;
+                
+                Tuple rhsCurrent = null;
+                if (rhsIter != null) {
+                    rhsCurrent = rhsIter.next();
+                    if (rhsCurrent == null) {
+                        rhsIter.close();
+                        rhsIter = null;
+                    } else if (isSingleValueOnly) {
+                        throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException();
+                    }
+                }
+                while (rhsIter == null) {
+                    current = iter.next();
+                    if (current == null) {
+                        close();
+                        return null;
+                    }
+                    runtimeContext.setCorrelateVariableValue(variableId, 
current);
+                    rhsIter = rhs.iterator();
+                    rhsCurrent = rhsIter.next();
+                    if ((rhsCurrent == null && (joinType == JoinType.Inner || 
joinType == JoinType.Semi))
+                            || (rhsCurrent != null && joinType == 
JoinType.Anti)) {
+                        rhsIter.close();
+                        rhsIter = null;
+                    }
+                }
+                
+                Tuple joined;
+                try {
+                    joined = rhsBitSet == ValueBitSet.EMPTY_VALUE_BITSET ?
+                            current : TupleProjector.mergeProjectedValue(
+                                    convertLhs(current), joinedSchema, 
destBitSet,
+                                    rhsCurrent, rhsSchema, rhsBitSet, 
rhsFieldPosition);
+                } catch (IOException e) {
+                    throw new SQLException(e);
+                }
+                                
+                if ((joinType == JoinType.Semi || rhsCurrent == null) && 
rhsIter != null) {
+                    rhsIter.close();
+                    rhsIter = null;
+                }
+                
+                return joined;
+            }
+
+            @Override
+            public void explain(List<String> planSteps) {
+            }
+            
+            private ProjectedValueTuple convertLhs(Tuple lhs) throws 
IOException {
+                ProjectedValueTuple t;
+                if (lhs instanceof ProjectedValueTuple) {
+                    t = (ProjectedValueTuple) lhs;
+                } else {
+                    ImmutableBytesWritable ptr = getContext().getTempPtr();
+                    TupleProjector.decodeProjectedValue(lhs, ptr);
+                    lhsBitSet.clear();
+                    lhsBitSet.or(ptr);
+                    int bitSetLen = lhsBitSet.getEstimatedLength();
+                    t = new ProjectedValueTuple(lhs, 
lhs.getValue(0).getTimestamp(), 
+                            ptr.get(), ptr.getOffset(), ptr.getLength(), 
bitSetLen);
+
+                }
+                return t;
+            }
+        };
+    }
+
+    @Override
+    public Integer getLimit() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 98eb2dd..21b25d6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -37,7 +37,7 @@ import org.apache.phoenix.schema.TableRef;
 public class DegenerateQueryPlan extends BaseQueryPlan {
 
     public DegenerateQueryPlan(StatementContext context, FilterableStatement 
statement, TableRef table) {
-        super(context, statement, table, RowProjector.EMPTY_PROJECTOR, 
PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, 
OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null);
+        super(context, statement, table, RowProjector.EMPTY_PROJECTOR, 
PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, 
OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null, null);
         context.setScanRanges(ScanRanges.NOTHING);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 2ac728d..72920b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -83,9 +83,9 @@ public class HashJoinPlan extends DelegateQueryPlan {
     private final HashJoinInfo joinInfo;
     private final SubPlan[] subPlans;
     private final boolean recompileWhereClause;
+    private final int maxServerCacheTimeToLive;
     private List<SQLCloseable> dependencies;
     private HashCacheClient hashClient;
-    private int maxServerCacheTimeToLive;
     private AtomicLong firstJobEndTime;
     private List<Expression> keyRangeExpressions;
     
@@ -114,6 +114,8 @@ public class HashJoinPlan extends DelegateQueryPlan {
         this.joinInfo = joinInfo;
         this.subPlans = subPlans;
         this.recompileWhereClause = recompileWhereClause;
+        this.maxServerCacheTimeToLive = 
plan.getContext().getConnection().getQueryServices().getProps().getInt(
+                QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, 
QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
     }
     
     @Override
@@ -130,8 +132,9 @@ public class HashJoinPlan extends DelegateQueryPlan {
         List<Future<Object>> futures = 
Lists.<Future<Object>>newArrayListWithExpectedSize(count);
         dependencies = Lists.newArrayList();
         if (joinInfo != null) {
-            hashClient = new 
HashCacheClient(delegate.getContext().getConnection());
-            maxServerCacheTimeToLive = 
services.getProps().getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
 QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+            hashClient = hashClient != null ? 
+                    hashClient 
+                  : new HashCacheClient(delegate.getContext().getConnection());
             firstJobEndTime = new AtomicLong(0);
             keyRangeExpressions = new CopyOnWriteArrayList<Expression>();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index e7230cc..ab13e6c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -39,20 +39,20 @@ import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 public class LiteralResultIterationPlan extends BaseQueryPlan {
-    protected final Iterator<Tuple> tupleIterator;
+    protected final Iterable<Tuple> tuples;
 
     public LiteralResultIterationPlan(StatementContext context, 
             FilterableStatement statement, TableRef tableRef, RowProjector 
projection, 
             Integer limit, OrderBy orderBy, ParallelIteratorFactory 
parallelIteratorFactory) {
-        this(Collections.<Tuple> singletonList(new 
SingleKeyValueTuple(KeyValue.LOWESTKEY)).iterator(), 
+        this(Collections.<Tuple> singletonList(new 
SingleKeyValueTuple(KeyValue.LOWESTKEY)), 
                 context, statement, tableRef, projection, limit, orderBy, 
parallelIteratorFactory);
     }
 
-    public LiteralResultIterationPlan(Iterator<Tuple> tupleIterator, 
StatementContext context, 
+    public LiteralResultIterationPlan(Iterable<Tuple> tuples, StatementContext 
context, 
             FilterableStatement statement, TableRef tableRef, RowProjector 
projection, 
             Integer limit, OrderBy orderBy, ParallelIteratorFactory 
parallelIteratorFactory) {
-        super(context, statement, tableRef, projection, 
context.getBindManager().getParameterMetaData(), limit, orderBy, 
GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory);
-        this.tupleIterator = tupleIterator;
+        super(context, statement, tableRef, projection, 
context.getBindManager().getParameterMetaData(), limit, orderBy, 
GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null);
+        this.tuples = tuples;
     }
 
     @Override
@@ -74,6 +74,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan 
{
     protected ResultIterator newIterator(ParallelScanGrouper scanGrouper)
             throws SQLException {
         ResultIterator scanner = new ResultIterator() {
+            private final Iterator<Tuple> tupleIterator = tuples.iterator();
             private boolean closed = false;
             private int count = 0;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java
new file mode 100644
index 0000000..89dd082
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public interface RuntimeContext {
+
+    public abstract void defineCorrelateVariable(String variableId, TableRef 
def);
+
+    public abstract TableRef getCorrelateVariableDef(String variableId);
+
+    public abstract void setCorrelateVariableValue(String variableId, Tuple 
value);
+
+    public abstract Tuple getCorrelateVariableValue(String variableId);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java
new file mode 100644
index 0000000..6a1ba4a
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.util.Map;
+
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+import com.google.common.collect.Maps;
+
+public class RuntimeContextImpl implements RuntimeContext {
+    Map<String, VariableEntry> correlateVariables;
+
+    public RuntimeContextImpl() {
+        this.correlateVariables = Maps.newHashMap();
+    }
+    
+    @Override
+    public void defineCorrelateVariable(String variableId, TableRef def) {
+        this.correlateVariables.put(variableId, new VariableEntry(def));
+    }
+    
+    @Override
+    public TableRef getCorrelateVariableDef(String variableId) {
+        VariableEntry entry = this.correlateVariables.get(variableId);
+        if (entry == null)
+            throw new RuntimeException("Variable '" + variableId + "' 
undefined.");
+        
+        return entry.getDef();
+    }
+    
+    @Override
+    public void setCorrelateVariableValue(String variableId, Tuple value) {
+        VariableEntry entry = this.correlateVariables.get(variableId);
+        if (entry == null)
+            throw new RuntimeException("Variable '" + variableId + "' 
undefined.");
+        
+        entry.setValue(value);
+    }
+
+    @Override
+    public Tuple getCorrelateVariableValue(String variableId) {
+        VariableEntry entry = this.correlateVariables.get(variableId);
+        if (entry == null)
+            throw new RuntimeException("Variable '" + variableId + "' 
undefined.");
+        
+        return entry.getValue();
+    }
+    
+    private static class VariableEntry {
+        private final TableRef def;
+        private Tuple value;
+        
+        VariableEntry(TableRef def) {
+            this.def = def;
+        }
+        
+        TableRef getDef() {
+            return def;
+        }
+        
+        Tuple getValue() {
+            return value;
+        }
+        
+        void setValue(Tuple value) {
+            this.value = value;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index e9b8a3a..9f7e482 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.ChunkedResultIterator;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LimitingResultIterator;
@@ -75,11 +76,15 @@ public class ScanPlan extends BaseQueryPlan {
     private List<KeyRange> splits;
     private List<List<Scan>> scans;
     private boolean allowPageFilter;
-
+    
     public ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) 
throws SQLException {
+        this(context, statement, table, projector, limit, orderBy, 
parallelIteratorFactory, allowPageFilter, null);
+    }
+    
+    private ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, 
Expression dynamicFilter) throws SQLException {
         super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit, orderBy, 
GroupBy.EMPTY_GROUP_BY,
                 parallelIteratorFactory != null ? parallelIteratorFactory :
-                        buildResultIteratorFactory(context, table, orderBy, 
limit, allowPageFilter));
+                        buildResultIteratorFactory(context, table, orderBy, 
limit, allowPageFilter), dynamicFilter);
         this.allowPageFilter = allowPageFilter;
         if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
             int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 1bbda07..297b6cc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -83,6 +83,7 @@ public class SortMergeJoinPlan implements QueryPlan {
     private final KeyValueSchema rhsSchema;
     private final int rhsFieldPosition;
     private final boolean isSingleValueOnly;
+    private final int thresholdBytes;
 
     public SortMergeJoinPlan(StatementContext context, FilterableStatement 
statement, TableRef table, 
             JoinType type, QueryPlan lhsPlan, QueryPlan rhsPlan, 
List<Expression> lhsKeyExpressions, List<Expression> rhsKeyExpressions,
@@ -101,6 +102,8 @@ public class SortMergeJoinPlan implements QueryPlan {
         this.rhsSchema = buildSchema(rhsTable);
         this.rhsFieldPosition = rhsFieldPosition;
         this.isSingleValueOnly = isSingleValueOnly;
+        this.thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
+                QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
     }
 
     private static KeyValueSchema buildSchema(PTable table) {
@@ -244,8 +247,6 @@ public class SortMergeJoinPlan implements QueryPlan {
             int len = lhsBitSet.getEstimatedLength();
             this.emptyProjectedValue = new byte[len];
             lhsBitSet.toBytes(emptyProjectedValue, 0);
-            int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-                    QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
             this.queue = new MappedByteBufferTupleQueue(thresholdBytes);
             this.queueIterator = null;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
index c4a6b20..c8fef3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@ -27,6 +27,7 @@ import org.apache.phoenix.expression.BaseSingleExpression;
 import org.apache.phoenix.expression.BaseTerminalExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -47,7 +48,7 @@ public class UnnestArrayPlan extends DelegateQueryPlan {
 
     @Override
     public ResultIterator iterator() throws SQLException {
-        return new UnnestArrayResultIterator(delegate.iterator());
+        return iterator(DefaultParallelScanGrouper.getInstance());
     }
 
     @Override
@@ -61,6 +62,11 @@ public class UnnestArrayPlan extends DelegateQueryPlan {
         planSteps.add("UNNEST");
         return new ExplainPlan(planSteps);
     }
+    
+    @Override
+    public Integer getLimit() {
+        return null;
+    }
 
     public class UnnestArrayResultIterator extends DelegateResultIterator {
         private final UnnestArrayElemRefExpression elemRefExpression;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java
new file mode 100644
index 0000000..7ba43c7
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.execute.RuntimeContext;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
+
+public class CorrelateVariableFieldAccessExpression extends 
BaseTerminalExpression {
+    private final RuntimeContext runtimeContext;
+    private final String variableId;
+    private final Expression fieldAccessExpression;
+    
+    public CorrelateVariableFieldAccessExpression(RuntimeContext context, 
String variableId, Expression fieldAccessExpression) {
+        super();
+        this.runtimeContext = context;
+        this.variableId = variableId;
+        this.fieldAccessExpression = fieldAccessExpression;
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        Tuple variable = runtimeContext.getCorrelateVariableValue(variableId);
+        if (variable == null)
+            throw new RuntimeException("Variable '" + variableId + "' not 
set.");
+        
+        return fieldAccessExpression.evaluate(variable, ptr);
+    }
+
+    @Override
+    public <T> T accept(ExpressionVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        boolean success = evaluate(null, ptr);
+        Object value = success ? getDataType().toObject(ptr) : null;
+        try {
+            LiteralExpression expr = LiteralExpression.newConstant(value, 
getDataType());
+            expr.write(output);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+    
+    @SuppressWarnings("rawtypes")
+    @Override
+    public PDataType getDataType() {
+        return this.fieldAccessExpression.getDataType();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index 07f3bdd..f98c787 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -285,7 +285,7 @@ public enum ExpressionType {
      * Return the ExpressionType for a given Expression instance
      */
     public static ExpressionType valueOf(Expression expression) {
-        ExpressionType type = classToEnumMap.get(expression.getClass());
+        ExpressionType type = valueOfOrNull(expression);
         if (type == null) { // FIXME: this exception gets swallowed and 
retries happen
             throw new IllegalArgumentException("No ExpressionType for " + 
expression.getClass());
         }
@@ -297,7 +297,14 @@ public enum ExpressionType {
      * or null if none exists.
      */
     public static ExpressionType valueOfOrNull(Expression expression) {
-        return classToEnumMap.get(expression.getClass());
+        Class <? extends Expression> clazz = expression.getClass();
+        // We will not have CorrelateVariableFieldAccessExpression on the 
server side,
+        // it will be evaluated at client side and will be serialized as 
+        // LiteralExpression instead.
+        if (clazz == CorrelateVariableFieldAccessExpression.class) {
+            clazz = LiteralExpression.class;
+        }
+        return classToEnumMap.get(clazz);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java.orig
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java.orig
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java.orig
deleted file mode 100644
index d28c1f9..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java.orig
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.expression;
-
-import java.util.Map;
-
-import org.apache.phoenix.expression.function.AbsFunction;
-import org.apache.phoenix.expression.function.ArrayAllComparisonExpression;
-import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression;
-import org.apache.phoenix.expression.function.ArrayAppendFunction;
-import org.apache.phoenix.expression.function.ArrayElemRefExpression;
-import org.apache.phoenix.expression.function.ArrayIndexFunction;
-import org.apache.phoenix.expression.function.ArrayLengthFunction;
-import org.apache.phoenix.expression.function.ArrayPrependFunction;
-import org.apache.phoenix.expression.function.ByteBasedRegexpReplaceFunction;
-import org.apache.phoenix.expression.function.ByteBasedRegexpSplitFunction;
-import org.apache.phoenix.expression.function.ByteBasedRegexpSubstrFunction;
-import org.apache.phoenix.expression.function.CbrtFunction;
-import org.apache.phoenix.expression.function.CeilDateExpression;
-import org.apache.phoenix.expression.function.CeilDecimalExpression;
-import org.apache.phoenix.expression.function.CeilFunction;
-import org.apache.phoenix.expression.function.CeilTimestampExpression;
-import org.apache.phoenix.expression.function.CoalesceFunction;
-import org.apache.phoenix.expression.function.ConvertTimezoneFunction;
-import org.apache.phoenix.expression.function.CountAggregateFunction;
-import org.apache.phoenix.expression.function.DayOfMonthFunction;
-import org.apache.phoenix.expression.function.DecodeFunction;
-import org.apache.phoenix.expression.function.DistinctCountAggregateFunction;
-import org.apache.phoenix.expression.function.DistinctValueAggregateFunction;
-import org.apache.phoenix.expression.function.EncodeFunction;
-import org.apache.phoenix.expression.function.ExpFunction;
-import org.apache.phoenix.expression.function.ExternalSqlTypeIdFunction;
-import org.apache.phoenix.expression.function.FirstValueFunction;
-import org.apache.phoenix.expression.function.FloorDateExpression;
-import org.apache.phoenix.expression.function.FloorDecimalExpression;
-import org.apache.phoenix.expression.function.FloorFunction;
-import org.apache.phoenix.expression.function.HourFunction;
-import org.apache.phoenix.expression.function.IndexStateNameFunction;
-import org.apache.phoenix.expression.function.InstrFunction;
-import org.apache.phoenix.expression.function.InvertFunction;
-import org.apache.phoenix.expression.function.LTrimFunction;
-import org.apache.phoenix.expression.function.LastValueFunction;
-import org.apache.phoenix.expression.function.LengthFunction;
-import org.apache.phoenix.expression.function.LnFunction;
-import org.apache.phoenix.expression.function.LogFunction;
-import org.apache.phoenix.expression.function.LowerFunction;
-import org.apache.phoenix.expression.function.LpadFunction;
-import org.apache.phoenix.expression.function.MD5Function;
-import org.apache.phoenix.expression.function.MaxAggregateFunction;
-import org.apache.phoenix.expression.function.MinAggregateFunction;
-import org.apache.phoenix.expression.function.MinuteFunction;
-import org.apache.phoenix.expression.function.MonthFunction;
-import org.apache.phoenix.expression.function.NowFunction;
-import org.apache.phoenix.expression.function.NthValueFunction;
-import org.apache.phoenix.expression.function.PercentRankAggregateFunction;
-import org.apache.phoenix.expression.function.PercentileContAggregateFunction;
-import org.apache.phoenix.expression.function.PercentileDiscAggregateFunction;
-import org.apache.phoenix.expression.function.PowerFunction;
-import org.apache.phoenix.expression.function.RTrimFunction;
-import org.apache.phoenix.expression.function.RandomFunction;
-import org.apache.phoenix.expression.function.RegexpReplaceFunction;
-import org.apache.phoenix.expression.function.RegexpSplitFunction;
-import org.apache.phoenix.expression.function.RegexpSubstrFunction;
-import org.apache.phoenix.expression.function.ReverseFunction;
-import org.apache.phoenix.expression.function.RoundDateExpression;
-import org.apache.phoenix.expression.function.RoundDecimalExpression;
-import org.apache.phoenix.expression.function.RoundFunction;
-import org.apache.phoenix.expression.function.RoundTimestampExpression;
-import org.apache.phoenix.expression.function.SQLIndexTypeFunction;
-import org.apache.phoenix.expression.function.SQLTableTypeFunction;
-import org.apache.phoenix.expression.function.SQLViewTypeFunction;
-import org.apache.phoenix.expression.function.SecondFunction;
-import org.apache.phoenix.expression.function.SignFunction;
-import org.apache.phoenix.expression.function.SqlTypeNameFunction;
-import org.apache.phoenix.expression.function.SqrtFunction;
-import org.apache.phoenix.expression.function.StddevPopFunction;
-import org.apache.phoenix.expression.function.StddevSampFunction;
-import org.apache.phoenix.expression.function.StringBasedRegexpReplaceFunction;
-import org.apache.phoenix.expression.function.StringBasedRegexpSplitFunction;
-import org.apache.phoenix.expression.function.StringBasedRegexpSubstrFunction;
-import org.apache.phoenix.expression.function.SubstrFunction;
-import org.apache.phoenix.expression.function.SumAggregateFunction;
-import org.apache.phoenix.expression.function.TimezoneOffsetFunction;
-import org.apache.phoenix.expression.function.ToCharFunction;
-import org.apache.phoenix.expression.function.ToDateFunction;
-import org.apache.phoenix.expression.function.ToNumberFunction;
-import org.apache.phoenix.expression.function.ToTimeFunction;
-import org.apache.phoenix.expression.function.ToTimestampFunction;
-import org.apache.phoenix.expression.function.TrimFunction;
-import org.apache.phoenix.expression.function.TruncFunction;
-import org.apache.phoenix.expression.function.UDFExpression;
-import org.apache.phoenix.expression.function.UpperFunction;
-import org.apache.phoenix.expression.function.WeekFunction;
-import org.apache.phoenix.expression.function.YearFunction;
-
-import com.google.common.collect.Maps;
-
-/**
- *
- * Enumeration of all Expression types that will be looked up. They may be 
evaluated on the server-side.
- * Used during serialization and deserialization to pass Expression between 
client
- * and server.
- *
- *
- * @since 0.1
- */
-//Important : When you want to add new Types make sure to add those towards 
the end, not changing the existing type's
-//ordinal
-public enum ExpressionType {
-    ReverseFunction(ReverseFunction.class),
-    RowKey(RowKeyColumnExpression.class),
-    KeyValue(KeyValueColumnExpression.class),
-    LiteralValue(LiteralExpression.class),
-    RoundFunction(RoundFunction.class),
-    FloorFunction(FloorFunction.class),
-    CeilFunction(CeilFunction.class),
-    RoundDateExpression(RoundDateExpression.class),
-    FloorDateExpression(FloorDateExpression.class),
-    CeilDateExpression(CeilDateExpression.class),
-    RoundTimestampExpression(RoundTimestampExpression.class),
-    CeilTimestampExpression(CeilTimestampExpression.class),
-    RoundDecimalExpression(RoundDecimalExpression.class),
-    FloorDecimalExpression(FloorDecimalExpression.class),
-    CeilDecimalExpression(CeilDecimalExpression.class),
-    TruncFunction(TruncFunction.class),
-    ToDateFunction(ToDateFunction.class),
-    ToCharFunction(ToCharFunction.class),
-    ToNumberFunction(ToNumberFunction.class),
-    CoerceFunction(CoerceExpression.class),
-    SubstrFunction(SubstrFunction.class),
-    AndExpression(AndExpression.class),
-    OrExpression(OrExpression.class),
-    ComparisonExpression(ComparisonExpression.class),
-    CountAggregateFunction(CountAggregateFunction.class),
-    SumAggregateFunction(SumAggregateFunction.class),
-    MinAggregateFunction(MinAggregateFunction.class),
-    MaxAggregateFunction(MaxAggregateFunction.class),
-    StringBasedLikeExpression(StringBasedLikeExpression.class),
-    NotExpression(NotExpression.class),
-    CaseExpression(CaseExpression.class),
-    InListExpression(InListExpression.class),
-    IsNullExpression(IsNullExpression.class),
-    LongSubtractExpression(LongSubtractExpression.class),
-    DateSubtractExpression(DateSubtractExpression.class),
-    DecimalSubtractExpression(DecimalSubtractExpression.class),
-    LongAddExpression(LongAddExpression.class),
-    DecimalAddExpression(DecimalAddExpression.class),
-    DateAddExpression(DateAddExpression.class),
-    LongMultiplyExpression(LongMultiplyExpression.class),
-    DecimalMultiplyExpression(DecimalMultiplyExpression.class),
-    LongDivideExpression(LongDivideExpression.class),
-    DecimalDivideExpression(DecimalDivideExpression.class),
-    CoalesceFunction(CoalesceFunction.class),
-    StringBasedRegexpReplaceFunction(StringBasedRegexpReplaceFunction.class),
-    SQLTypeNameFunction(SqlTypeNameFunction.class),
-    StringBasedRegexpSubstrFunction(StringBasedRegexpSubstrFunction.class),
-    StringConcatExpression(StringConcatExpression.class),
-    LengthFunction(LengthFunction.class),
-    LTrimFunction(LTrimFunction.class),
-    RTrimFunction(RTrimFunction.class),
-    UpperFunction(UpperFunction.class),
-    LowerFunction(LowerFunction.class),
-    TrimFunction(TrimFunction.class),
-    DistinctCountAggregateFunction(DistinctCountAggregateFunction.class),
-    PercentileContAggregateFunction(PercentileContAggregateFunction.class),
-    PercentRankAggregateFunction(PercentRankAggregateFunction.class),
-    StddevPopFunction(StddevPopFunction.class),
-    StddevSampFunction(StddevSampFunction.class),
-    PercentileDiscAggregateFunction(PercentileDiscAggregateFunction.class),
-    DoubleAddExpression(DoubleAddExpression.class),
-    DoubleSubtractExpression(DoubleSubtractExpression.class),
-    DoubleMultiplyExpression(DoubleMultiplyExpression.class),
-    DoubleDivideExpression(DoubleDivideExpression.class),
-    RowValueConstructorExpression(RowValueConstructorExpression.class),
-    MD5Function(MD5Function.class),
-    SQLTableTypeFunction(SQLTableTypeFunction.class),
-    IndexStateName(IndexStateNameFunction.class),
-    InvertFunction(InvertFunction.class),
-    ProjectedColumnExpression(ProjectedColumnExpression.class),
-    TimestampAddExpression(TimestampAddExpression.class),
-    TimestampSubtractExpression(TimestampSubtractExpression.class),
-    ArrayIndexFunction(ArrayIndexFunction.class),
-    ArrayLengthFunction(ArrayLengthFunction.class),
-    ArrayConstructorExpression(ArrayConstructorExpression.class),
-    SQLViewTypeFunction(SQLViewTypeFunction.class),
-    ExternalSqlTypeIdFunction(ExternalSqlTypeIdFunction.class),
-    ConvertTimezoneFunction(ConvertTimezoneFunction.class),
-    DecodeFunction(DecodeFunction.class),
-    TimezoneOffsetFunction(TimezoneOffsetFunction.class),
-    EncodeFunction(EncodeFunction.class),
-    LpadFunction(LpadFunction.class),
-    NthValueFunction(NthValueFunction.class),
-    FirstValueFunction(FirstValueFunction.class),
-    LastValueFunction(LastValueFunction.class),
-    ArrayAnyComparisonExpression(ArrayAnyComparisonExpression.class),
-    ArrayAllComparisonExpression(ArrayAllComparisonExpression.class),
-    InlineArrayElemRefExpression(ArrayElemRefExpression.class),
-    SQLIndexTypeFunction(SQLIndexTypeFunction.class),
-    ModulusExpression(ModulusExpression.class),
-    DistinctValueAggregateFunction(DistinctValueAggregateFunction.class),
-    StringBasedRegexpSplitFunction(StringBasedRegexpSplitFunction.class),
-    RandomFunction(RandomFunction.class),
-    ToTimeFunction(ToTimeFunction.class),
-    ToTimestampFunction(ToTimestampFunction.class),
-    ByteBasedLikeExpression(ByteBasedLikeExpression.class),
-    ByteBasedRegexpReplaceFunction(ByteBasedRegexpReplaceFunction.class),
-    ByteBasedRegexpSubstrFunction(ByteBasedRegexpSubstrFunction.class),
-    ByteBasedRegexpSplitFunction(ByteBasedRegexpSplitFunction.class),
-    LikeExpression(LikeExpression.class),
-    RegexpReplaceFunction(RegexpReplaceFunction.class),
-    RegexpSubstrFunction(RegexpSubstrFunction.class),
-    RegexpSplitFunction(RegexpSplitFunction.class),
-    SignFunction(SignFunction.class),
-    YearFunction(YearFunction.class),
-    MonthFunction(MonthFunction.class),
-    SecondFunction(SecondFunction.class),
-    WeekFunction(WeekFunction.class),
-    HourFunction(HourFunction.class),
-    NowFunction(NowFunction.class),
-    InstrFunction(InstrFunction.class),
-    MinuteFunction(MinuteFunction.class),
-    DayOfMonthFunction(DayOfMonthFunction.class),
-    ArrayAppendFunction(ArrayAppendFunction.class),
-    UDFExpression(UDFExpression.class),
-    ArrayPrependFunction(ArrayPrependFunction.class),
-    SqrtFunction(SqrtFunction.class),
-    AbsFunction(AbsFunction.class),
-    CbrtFunction(CbrtFunction.class),
-    LnFunction(LnFunction.class),
-    LogFunction(LogFunction.class),
-    ExpFunction(ExpFunction.class),
-    PowerFunction(PowerFunction.class)
-    ;
-
-    ExpressionType(Class<? extends Expression> clazz) {
-        this.clazz = clazz;
-    }
-
-    public Class<? extends Expression> getExpressionClass() {
-        return clazz;
-    }
-
-    private final Class<? extends Expression> clazz;
-
-    private static final Map<Class<? extends Expression>,ExpressionType> 
classToEnumMap = Maps.newHashMapWithExpectedSize(3);
-    static {
-        for (ExpressionType type : ExpressionType.values()) {
-            classToEnumMap.put(type.clazz, type);
-        }
-    }
-
-    /**
-     * Return the ExpressionType for a given Expression instance
-     */
-    public static ExpressionType valueOf(Expression expression) {
-        ExpressionType type = classToEnumMap.get(expression.getClass());
-        if (type == null) { // FIXME: this exception gets swallowed and 
retries happen
-            throw new IllegalArgumentException("No ExpressionType for " + 
expression.getClass());
-        }
-        return type;
-    }
-
-    /**
-     * Return the ExpressionType for a given Expression instance
-     * or null if none exists.
-     */
-    public static ExpressionType valueOfOrNull(Expression expression) {
-        return classToEnumMap.get(expression.getClass());
-    }
-
-    /**
-     * Instantiates a DataAccessor based on its DataAccessorType
-     */
-    public Expression newInstance() {
-        try {
-            return clazz.newInstance();
-        } catch (InstantiationException e) {
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            throw new RuntimeException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
index 18b8795..8d14545 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java
@@ -26,6 +26,7 @@ import 
org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
@@ -61,6 +62,11 @@ public abstract class CloneExpressionVisitor extends 
TraverseAllExpressionVisito
     }
 
     @Override
+    public Expression visit(CorrelateVariableFieldAccessExpression node) {
+        return node;
+    }
+
+    @Override
     public Expression visit(LiteralExpression node) {
         return node;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
index 0a8d3ad..31f340d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java
@@ -27,6 +27,7 @@ import 
org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
@@ -108,6 +109,7 @@ public interface ExpressionVisitor<E> {
     public Iterator<Expression> visitEnter(ArrayConstructorExpression node);
     public E visitLeave(ArrayConstructorExpression node, List<E> l);
     
+    public E visit(CorrelateVariableFieldAccessExpression node);
     public E visit(LiteralExpression node);
     public E visit(RowKeyColumnExpression node);
     public E visit(KeyValueColumnExpression node);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
index e7e7c67..3b7067a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java
@@ -26,7 +26,9 @@ import 
org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.IsNullExpression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -99,6 +101,11 @@ public class StatelessTraverseAllExpressionVisitor<E> 
extends TraverseAllExpress
     }
 
     @Override
+    public E visit(CorrelateVariableFieldAccessExpression node) {
+        return null;
+    }
+
+    @Override
     public E visit(LiteralExpression node) {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
index 019754f..83b28bd 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java
@@ -26,7 +26,9 @@ import 
org.apache.phoenix.expression.ArrayConstructorExpression;
 import org.apache.phoenix.expression.CaseExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 import org.apache.phoenix.expression.DivideExpression;
+import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.InListExpression;
 import org.apache.phoenix.expression.IsNullExpression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -99,6 +101,11 @@ public class StatelessTraverseNoExpressionVisitor<E> 
extends TraverseNoExpressio
     }
 
     @Override
+    public E visit(CorrelateVariableFieldAccessExpression node) {
+        return null;
+    }
+
+    @Override
     public E visit(LiteralExpression node) {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
index 797f3ce..e3d0987 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
@@ -39,7 +39,8 @@ public class UngroupedAggregatingResultIterator extends 
GroupedAggregatingResult
         Tuple result = super.next();
         // Ensure ungrouped aggregregation always returns a row, even if the 
underlying iterator doesn't.
         if (result == null && !hasRows) {
-            // Generate value using unused ClientAggregators
+            // We should reset ClientAggregators here in case they are being 
reused in a new ResultIterator.
+            aggregators.reset(aggregators.getAggregators());
             byte[] value = aggregators.toBytes(aggregators.getAggregators());
             result = new SingleKeyValueTuple(
                     KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
new file mode 100644
index 0000000..7ae3757
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.JoinCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.TupleProjectionCompiler;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.expression.ComparisonExpression;
+import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.ProjectedColumnExpression;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class CorrelatePlanTest {
+    
+    private static final StatementContext CONTEXT;
+    static {
+        try {
+            PhoenixConnection connection = 
DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + 
CONNECTIONLESS).unwrap(PhoenixConnection.class);
+            PhoenixStatement stmt = new PhoenixStatement(connection);
+            ColumnResolver resolver = 
FromCompiler.getResolverForQuery(SelectStatement.SELECT_ONE, connection);
+            CONTEXT = new StatementContext(stmt, resolver, new Scan(), new 
SequenceManager(stmt));
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    private static final Object[][] LEFT_RELATION = new Object[][] {
+            {1, "1"},
+            {2, "2"},
+            {3, "3"},
+            {4, "4"},
+            {5, "5"},
+    };
+    
+    private static final Object[][] RIGHT_RELATION = new Object[][] {
+            {"2", 20},
+            {"2", 40},
+            {"5", 50},
+            {"6", 60},
+            {"5", 100},
+            {"1", 10},
+            {"3", 30},
+    };        
+    
+    @Test
+    public void testCorrelatePlanWithInnerJoinType() throws SQLException {
+        Object[][] expected = new Object[][] {
+                {1, "1", "1", 10},
+                {2, "2", "2", 20},
+                {2, "2", "2", 40},
+                {3, "3", "3", 30},
+                {5, "5", "5", 50},
+                {5, "5", "5", 100},
+        };
+        testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Inner, 
expected);
+    }
+    
+    @Test
+    public void testCorrelatePlanWithLeftJoinType() throws SQLException {
+        Object[][] expected = new Object[][] {
+                {1, "1", "1", 10},
+                {2, "2", "2", 20},
+                {2, "2", "2", 40},
+                {3, "3", "3", 30},
+                {4, "4", null, null},
+                {5, "5", "5", 50},
+                {5, "5", "5", 100},
+        };
+        testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Left, 
expected);
+    }
+    
+    @Test
+    public void testCorrelatePlanWithSemiJoinType() throws SQLException {
+        Object[][] expected = new Object[][] {
+                {1, "1"},
+                {2, "2"},
+                {3, "3"},
+                {5, "5"},
+        };
+        testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Semi, 
expected);
+    }
+    
+    @Test
+    public void testCorrelatePlanWithAntiJoinType() throws SQLException {
+        Object[][] expected = new Object[][] {
+                {4, "4"},
+        };
+        testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Anti, 
expected);
+    }
+    
+    @Test
+    public void testCorrelatePlanWithSingleValueOnly() throws SQLException {
+        Object[][] expected = new Object[][] {
+                {1, "1", "1", 10},
+                {2, "2", "2", 20},
+                {2, "2", "2", 40},
+        };
+        try {
+            testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, 
JoinType.Inner, expected);
+        } catch (SQLException e) {
+            
assertEquals(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS.getErrorCode(),
 e.getErrorCode());
+        }
+        
+        Object[][] rightRelation = new Object[][] {
+                {"2", 20},
+                {"6", 60},
+                {"5", 100},
+                {"1", 10},
+        };        
+        expected = new Object[][] {
+                {1, "1", "1", 10},
+                {2, "2", "2", 20},
+                {5, "5", "5", 100},
+        };
+        testCorrelatePlan(LEFT_RELATION, rightRelation, 1, 0, JoinType.Inner, 
expected);        
+    }
+    
+    private void testCorrelatePlan(Object[][] leftRelation, Object[][] 
rightRelation, int leftCorrelColumn, int rightCorrelColumn, JoinType type, 
Object[][] expectedResult) throws SQLException {        
+        TableRef leftTable = createProjectedTableFromLiterals(leftRelation[0]);
+        TableRef rightTable = 
createProjectedTableFromLiterals(rightRelation[0]);
+        String varName = "$cor0";
+        RuntimeContext runtimeContext = new RuntimeContextImpl();
+        runtimeContext.defineCorrelateVariable(varName, leftTable);
+        QueryPlan leftPlan = newLiteralResultIterationPlan(leftRelation);
+        QueryPlan rightPlan = newLiteralResultIterationPlan(rightRelation);
+        Expression columnExpr = new ColumnRef(rightTable, 
rightCorrelColumn).newColumnExpression();
+        Expression fieldAccess = new 
CorrelateVariableFieldAccessExpression(runtimeContext, varName, new 
ColumnRef(leftTable, leftCorrelColumn).newColumnExpression());
+        Expression filter = ComparisonExpression.create(CompareOp.EQUAL, 
Arrays.asList(columnExpr, fieldAccess), CONTEXT.getTempPtr(), false);
+        rightPlan = new ClientScanPlan(CONTEXT, SelectStatement.SELECT_ONE, 
rightTable, RowProjector.EMPTY_PROJECTOR, null, filter, OrderBy.EMPTY_ORDER_BY, 
rightPlan);
+        PTable joinedTable = 
JoinCompiler.joinProjectedTables(leftTable.getTable(), rightTable.getTable(), 
type);
+        CorrelatePlan correlatePlan = new CorrelatePlan(leftPlan, rightPlan, 
varName, type, false, runtimeContext, joinedTable, leftTable.getTable(), 
rightTable.getTable(), leftTable.getTable().getColumns().size());
+        ResultIterator iter = correlatePlan.iterator();
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        for (Object[] row : expectedResult) {
+            Tuple next = iter.next();
+            assertNotNull(next);
+            for (int i = 0; i < row.length; i++) {
+                PColumn column = joinedTable.getColumns().get(i);
+                boolean eval = new ProjectedColumnExpression(column, 
joinedTable, column.getName().getString()).evaluate(next, ptr);
+                Object o = eval ? column.getDataType().toObject(ptr) : null;
+                assertEquals(row[i], o);
+            }
+        }
+    }
+    
+    private QueryPlan newLiteralResultIterationPlan(Object[][] rows) {
+        List<Tuple> tuples = Lists.newArrayList();
+        Tuple baseTuple = new SingleKeyValueTuple(KeyValue.LOWESTKEY);
+        for (Object[] row : rows) {
+            Expression[] exprs = new Expression[row.length];
+            for (int i = 0; i < row.length; i++) {
+                exprs[i] = LiteralExpression.newConstant(row[i]);
+            }
+            TupleProjector projector = new TupleProjector(exprs);
+            tuples.add(projector.projectResults(baseTuple));
+        }
+        
+        return new LiteralResultIterationPlan(tuples, CONTEXT, 
SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, 
RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null);
+    }
+
+
+    private TableRef createProjectedTableFromLiterals(Object[] row) {
+        List<PColumn> columns = Lists.<PColumn>newArrayList();
+        for (int i = 0; i < row.length; i++) {
+            String name = ParseNodeFactory.createTempAlias();
+            Expression expr = LiteralExpression.newConstant(row[i]);
+            columns.add(new PColumnImpl(PNameFactory.newName(name), 
PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY),
+                    expr.getDataType(), expr.getMaxLength(), expr.getScale(), 
expr.isNullable(),
+                    i, expr.getSortOrder(), null, null, false, name));
+        }
+        try {
+            PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, 
PName.EMPTY_NAME,
+                    PTableType.SUBQUERY, null, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
+                    null, null, columns, null, null, 
Collections.<PTable>emptyList(),
+                    false, Collections.<PName>emptyList(), null, null, false, 
false, false, null,
+                    null, null, true);
+            TableRef sourceTable = new TableRef(pTable);
+            List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> 
newArrayList();
+            for (PColumn column : sourceTable.getTable().getColumns()) {
+                sourceColumnRefs.add(new ColumnRef(sourceTable, 
column.getPosition()));
+            }
+        
+            return new 
TableRef(TupleProjectionCompiler.createProjectedTable(sourceTable, 
sourceColumnRefs, false));
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }        
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
index 0def172..896f920 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
@@ -67,13 +67,13 @@ import com.google.common.collect.Lists;
 @SuppressWarnings("rawtypes")
 public class UnnestArrayPlanTest {
     
-    private static final StatementContext context;
+    private static final StatementContext CONTEXT;
     static {
         try {
             PhoenixConnection connection = 
DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + 
CONNECTIONLESS).unwrap(PhoenixConnection.class);
             PhoenixStatement stmt = new PhoenixStatement(connection);
             ColumnResolver resolver = 
FromCompiler.getResolverForQuery(SelectStatement.SELECT_ONE, connection);
-            context = new StatementContext(stmt, resolver, new Scan(), new 
SequenceManager(stmt));
+            CONTEXT = new StatementContext(stmt, resolver, new Scan(), new 
SequenceManager(stmt));
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }
@@ -112,7 +112,7 @@ public class UnnestArrayPlanTest {
     private void testUnnestArrays(PArrayDataType arrayType, List<Object[]> 
arrays, boolean withOrdinality) throws Exception {
         PDataType baseType = PDataType.fromTypeId(arrayType.getSqlType() - 
PDataType.ARRAY_TYPE_BASE);
         List<Tuple> tuples = toTuples(arrayType, arrays);
-        LiteralResultIterationPlan subPlan = new 
LiteralResultIterationPlan(tuples.iterator(), context, 
SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, 
RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null);
+        LiteralResultIterationPlan subPlan = new 
LiteralResultIterationPlan(tuples, CONTEXT, SelectStatement.SELECT_ONE, 
TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, 
OrderBy.EMPTY_ORDER_BY, null);
         LiteralExpression dummy = LiteralExpression.newConstant(null, 
arrayType);
         RowKeyValueAccessor accessor = new 
RowKeyValueAccessor(Arrays.asList(dummy), 0);
         UnnestArrayPlan plan = new UnnestArrayPlan(subPlan, new 
RowKeyColumnExpression(dummy, accessor), withOrdinality);

Reply via email to