Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 0657aeeef -> 8f3cd25e3
PHOENIX-2198 Support correlate variable Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8f3cd25e Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8f3cd25e Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8f3cd25e Branch: refs/heads/4.x-HBase-0.98 Commit: 8f3cd25e370c133311a9042e2ba0e2f37c713000 Parents: 0657aee Author: maryannxue <wei....@intel.com> Authored: Wed Sep 9 17:16:50 2015 -0400 Committer: maryannxue <wei....@intel.com> Committed: Wed Sep 9 17:16:50 2015 -0400 ---------------------------------------------------------------------- .../apache/phoenix/execute/AggregatePlan.java | 9 +- .../apache/phoenix/execute/BaseQueryPlan.java | 22 +- .../apache/phoenix/execute/CorrelatePlan.java | 208 +++++++++++++ .../phoenix/execute/DegenerateQueryPlan.java | 2 +- .../apache/phoenix/execute/HashJoinPlan.java | 9 +- .../execute/LiteralResultIterationPlan.java | 11 +- .../apache/phoenix/execute/RuntimeContext.java | 33 ++ .../phoenix/execute/RuntimeContextImpl.java | 86 ++++++ .../org/apache/phoenix/execute/ScanPlan.java | 9 +- .../phoenix/execute/SortMergeJoinPlan.java | 5 +- .../apache/phoenix/execute/UnnestArrayPlan.java | 8 +- .../CorrelateVariableFieldAccessExpression.java | 75 +++++ .../phoenix/expression/ExpressionType.java | 11 +- .../phoenix/expression/ExpressionType.java.orig | 298 ------------------- .../visitor/CloneExpressionVisitor.java | 6 + .../expression/visitor/ExpressionVisitor.java | 2 + .../StatelessTraverseAllExpressionVisitor.java | 7 + .../StatelessTraverseNoExpressionVisitor.java | 7 + .../UngroupedAggregatingResultIterator.java | 3 +- .../phoenix/execute/CorrelatePlanTest.java | 248 +++++++++++++++ .../phoenix/execute/UnnestArrayPlanTest.java | 6 +- 21 files changed, 744 insertions(+), 321 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 67222d3..9a415b9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -77,7 +77,14 @@ public class AggregatePlan extends BaseQueryPlan { StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, Expression having) { - super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, groupBy, parallelIteratorFactory); + this(context, statement, table, projector, limit, orderBy, parallelIteratorFactory, groupBy, having, null); + } + + private AggregatePlan( + StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, + Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, + Expression having, Expression dynamicFilter) { + super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, groupBy, parallelIteratorFactory, dynamicFilter); this.having = having; this.aggregators = context.getAggregationManager().getAggregators(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index f14f574..5e58cbd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -40,7 +40,9 @@ import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.WhereCompiler; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ProjectedColumnExpression; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.iterate.DefaultParallelScanGrouper; @@ -93,12 +95,19 @@ public abstract class BaseQueryPlan implements QueryPlan { protected final Integer limit; protected final OrderBy orderBy; protected final GroupBy groupBy; - protected final ParallelIteratorFactory parallelIteratorFactory; + protected final ParallelIteratorFactory parallelIteratorFactory; + /* + * The filter expression that contains CorrelateVariableFieldAccessExpression + * and will have impact on the ScanRanges. It will recompiled at runtime + * immediately before creating the ResultIterator. + */ + protected final Expression dynamicFilter; protected BaseQueryPlan( StatementContext context, FilterableStatement statement, TableRef table, RowProjector projection, ParameterMetaData paramMetaData, Integer limit, OrderBy orderBy, - GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory) { + GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory, + Expression dynamicFilter) { this.context = context; this.statement = statement; this.tableRef = table; @@ -108,6 +117,7 @@ public abstract class BaseQueryPlan implements QueryPlan { this.orderBy = orderBy; this.groupBy = groupBy; this.parallelIteratorFactory = parallelIteratorFactory; + this.dynamicFilter = dynamicFilter; } @Override @@ -141,6 +151,10 @@ public abstract class BaseQueryPlan implements QueryPlan { public RowProjector getProjector() { return projection; } + + public Expression getDynamicFilter() { + return dynamicFilter; + } // /** // * Sets up an id used to do round robin queue processing on the server @@ -175,6 +189,10 @@ public abstract class BaseQueryPlan implements QueryPlan { Scan scan = context.getScan(); PTable table = context.getCurrentTable().getTable(); + if (dynamicFilter != null) { + WhereCompiler.compile(context, statement, null, Collections.singletonList(dynamicFilter), false, null); + } + if (OrderBy.REV_ROW_KEY_ORDER_BY.equals(orderBy)) { ScanUtil.setReversed(scan); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java new file mode 100644 index 0000000..1b0af8c --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.compile.ExplainPlan; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple; +import org.apache.phoenix.iterate.DefaultParallelScanGrouper; +import org.apache.phoenix.iterate.ParallelScanGrouper; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.parse.JoinTableNode.JoinType; +import org.apache.phoenix.schema.KeyValueSchema; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.ValueBitSet; +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Lists; + +public class CorrelatePlan extends DelegateQueryPlan { + private final QueryPlan rhs; + private final String variableId; + private final JoinType joinType; + private final boolean isSingleValueOnly; + private final RuntimeContext runtimeContext; + private final KeyValueSchema joinedSchema; + private final KeyValueSchema lhsSchema; + private final KeyValueSchema rhsSchema; + private final int rhsFieldPosition; + + public CorrelatePlan(QueryPlan lhs, QueryPlan rhs, String variableId, + JoinType joinType, boolean isSingleValueOnly, + RuntimeContext runtimeContext, PTable joinedTable, + PTable lhsTable, PTable rhsTable, int rhsFieldPosition) { + super(lhs); + if (joinType != JoinType.Inner && joinType != JoinType.Left && joinType != JoinType.Semi && joinType != JoinType.Anti) + throw new IllegalArgumentException("Unsupported join type '" + joinType + "' by CorrelatePlan"); + + this.rhs = rhs; + this.variableId = variableId; + this.joinType = joinType; + this.isSingleValueOnly = isSingleValueOnly; + this.runtimeContext = runtimeContext; + this.joinedSchema = buildSchema(joinedTable); + this.lhsSchema = buildSchema(lhsTable); + this.rhsSchema = buildSchema(rhsTable); + this.rhsFieldPosition = rhsFieldPosition; + } + + private static KeyValueSchema buildSchema(PTable table) { + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + if (table != null) { + for (PColumn column : table.getColumns()) { + if (!SchemaUtil.isPKColumn(column)) { + builder.addField(column); + } + } + } + return builder.build(); + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List<String> steps = Lists.newArrayList(); + steps.add("NESTED-LOOP-JOIN (" + joinType.toString().toUpperCase() + ") TABLES"); + for (String step : delegate.getExplainPlan().getPlanSteps()) { + steps.add(" " + step); + } + steps.add("AND" + (rhsSchema.getFieldCount() == 0 ? " (SKIP MERGE)" : "")); + for (String step : rhs.getExplainPlan().getPlanSteps()) { + steps.add(" " + step); + } + return new ExplainPlan(steps); + } + + @Override + public ResultIterator iterator() throws SQLException { + return iterator(DefaultParallelScanGrouper.getInstance()); + } + + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper) + throws SQLException { + return new ResultIterator() { + private final ValueBitSet destBitSet = ValueBitSet.newInstance(joinedSchema); + private final ValueBitSet lhsBitSet = ValueBitSet.newInstance(lhsSchema); + private final ValueBitSet rhsBitSet = + (joinType == JoinType.Semi || joinType == JoinType.Anti) ? + ValueBitSet.EMPTY_VALUE_BITSET + : ValueBitSet.newInstance(rhsSchema); + private final ResultIterator iter = delegate.iterator(); + private ResultIterator rhsIter = null; + private Tuple current = null; + private boolean closed = false; + + @Override + public void close() throws SQLException { + if (!closed) { + closed = true; + iter.close(); + if (rhsIter != null) { + rhsIter.close(); + } + } + } + + @Override + public Tuple next() throws SQLException { + if (closed) + return null; + + Tuple rhsCurrent = null; + if (rhsIter != null) { + rhsCurrent = rhsIter.next(); + if (rhsCurrent == null) { + rhsIter.close(); + rhsIter = null; + } else if (isSingleValueOnly) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException(); + } + } + while (rhsIter == null) { + current = iter.next(); + if (current == null) { + close(); + return null; + } + runtimeContext.setCorrelateVariableValue(variableId, current); + rhsIter = rhs.iterator(); + rhsCurrent = rhsIter.next(); + if ((rhsCurrent == null && (joinType == JoinType.Inner || joinType == JoinType.Semi)) + || (rhsCurrent != null && joinType == JoinType.Anti)) { + rhsIter.close(); + rhsIter = null; + } + } + + Tuple joined; + try { + joined = rhsBitSet == ValueBitSet.EMPTY_VALUE_BITSET ? + current : TupleProjector.mergeProjectedValue( + convertLhs(current), joinedSchema, destBitSet, + rhsCurrent, rhsSchema, rhsBitSet, rhsFieldPosition); + } catch (IOException e) { + throw new SQLException(e); + } + + if ((joinType == JoinType.Semi || rhsCurrent == null) && rhsIter != null) { + rhsIter.close(); + rhsIter = null; + } + + return joined; + } + + @Override + public void explain(List<String> planSteps) { + } + + private ProjectedValueTuple convertLhs(Tuple lhs) throws IOException { + ProjectedValueTuple t; + if (lhs instanceof ProjectedValueTuple) { + t = (ProjectedValueTuple) lhs; + } else { + ImmutableBytesWritable ptr = getContext().getTempPtr(); + TupleProjector.decodeProjectedValue(lhs, ptr); + lhsBitSet.clear(); + lhsBitSet.or(ptr); + int bitSetLen = lhsBitSet.getEstimatedLength(); + t = new ProjectedValueTuple(lhs, lhs.getValue(0).getTimestamp(), + ptr.get(), ptr.getOffset(), ptr.getLength(), bitSetLen); + + } + return t; + } + }; + } + + @Override + public Integer getLimit() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java index 98eb2dd..21b25d6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java @@ -37,7 +37,7 @@ import org.apache.phoenix.schema.TableRef; public class DegenerateQueryPlan extends BaseQueryPlan { public DegenerateQueryPlan(StatementContext context, FilterableStatement statement, TableRef table) { - super(context, statement, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null); + super(context, statement, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null, null); context.setScanRanges(ScanRanges.NOTHING); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 2ac728d..72920b2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -83,9 +83,9 @@ public class HashJoinPlan extends DelegateQueryPlan { private final HashJoinInfo joinInfo; private final SubPlan[] subPlans; private final boolean recompileWhereClause; + private final int maxServerCacheTimeToLive; private List<SQLCloseable> dependencies; private HashCacheClient hashClient; - private int maxServerCacheTimeToLive; private AtomicLong firstJobEndTime; private List<Expression> keyRangeExpressions; @@ -114,6 +114,8 @@ public class HashJoinPlan extends DelegateQueryPlan { this.joinInfo = joinInfo; this.subPlans = subPlans; this.recompileWhereClause = recompileWhereClause; + this.maxServerCacheTimeToLive = plan.getContext().getConnection().getQueryServices().getProps().getInt( + QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS); } @Override @@ -130,8 +132,9 @@ public class HashJoinPlan extends DelegateQueryPlan { List<Future<Object>> futures = Lists.<Future<Object>>newArrayListWithExpectedSize(count); dependencies = Lists.newArrayList(); if (joinInfo != null) { - hashClient = new HashCacheClient(delegate.getContext().getConnection()); - maxServerCacheTimeToLive = services.getProps().getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS); + hashClient = hashClient != null ? + hashClient + : new HashCacheClient(delegate.getContext().getConnection()); firstJobEndTime = new AtomicLong(0); keyRangeExpressions = new CopyOnWriteArrayList<Expression>(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java index e7230cc..ab13e6c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java @@ -39,20 +39,20 @@ import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; public class LiteralResultIterationPlan extends BaseQueryPlan { - protected final Iterator<Tuple> tupleIterator; + protected final Iterable<Tuple> tuples; public LiteralResultIterationPlan(StatementContext context, FilterableStatement statement, TableRef tableRef, RowProjector projection, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory) { - this(Collections.<Tuple> singletonList(new SingleKeyValueTuple(KeyValue.LOWESTKEY)).iterator(), + this(Collections.<Tuple> singletonList(new SingleKeyValueTuple(KeyValue.LOWESTKEY)), context, statement, tableRef, projection, limit, orderBy, parallelIteratorFactory); } - public LiteralResultIterationPlan(Iterator<Tuple> tupleIterator, StatementContext context, + public LiteralResultIterationPlan(Iterable<Tuple> tuples, StatementContext context, FilterableStatement statement, TableRef tableRef, RowProjector projection, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory) { - super(context, statement, tableRef, projection, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory); - this.tupleIterator = tupleIterator; + super(context, statement, tableRef, projection, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null); + this.tuples = tuples; } @Override @@ -74,6 +74,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan { protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException { ResultIterator scanner = new ResultIterator() { + private final Iterator<Tuple> tupleIterator = tuples.iterator(); private boolean closed = false; private int count = 0; http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java new file mode 100644 index 0000000..89dd082 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContext.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.tuple.Tuple; + +public interface RuntimeContext { + + public abstract void defineCorrelateVariable(String variableId, TableRef def); + + public abstract TableRef getCorrelateVariableDef(String variableId); + + public abstract void setCorrelateVariableValue(String variableId, Tuple value); + + public abstract Tuple getCorrelateVariableValue(String variableId); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java new file mode 100644 index 0000000..6a1ba4a --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/RuntimeContextImpl.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import java.util.Map; + +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.tuple.Tuple; + +import com.google.common.collect.Maps; + +public class RuntimeContextImpl implements RuntimeContext { + Map<String, VariableEntry> correlateVariables; + + public RuntimeContextImpl() { + this.correlateVariables = Maps.newHashMap(); + } + + @Override + public void defineCorrelateVariable(String variableId, TableRef def) { + this.correlateVariables.put(variableId, new VariableEntry(def)); + } + + @Override + public TableRef getCorrelateVariableDef(String variableId) { + VariableEntry entry = this.correlateVariables.get(variableId); + if (entry == null) + throw new RuntimeException("Variable '" + variableId + "' undefined."); + + return entry.getDef(); + } + + @Override + public void setCorrelateVariableValue(String variableId, Tuple value) { + VariableEntry entry = this.correlateVariables.get(variableId); + if (entry == null) + throw new RuntimeException("Variable '" + variableId + "' undefined."); + + entry.setValue(value); + } + + @Override + public Tuple getCorrelateVariableValue(String variableId) { + VariableEntry entry = this.correlateVariables.get(variableId); + if (entry == null) + throw new RuntimeException("Variable '" + variableId + "' undefined."); + + return entry.getValue(); + } + + private static class VariableEntry { + private final TableRef def; + private Tuple value; + + VariableEntry(TableRef def) { + this.def = def; + } + + TableRef getDef() { + return def; + } + + Tuple getValue() { + return value; + } + + void setValue(Tuple value) { + this.value = value; + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index e9b8a3a..9f7e482 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -29,6 +29,7 @@ import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.ScanRegionObserver; +import org.apache.phoenix.expression.Expression; import org.apache.phoenix.iterate.ChunkedResultIterator; import org.apache.phoenix.iterate.ConcatResultIterator; import org.apache.phoenix.iterate.LimitingResultIterator; @@ -75,11 +76,15 @@ public class ScanPlan extends BaseQueryPlan { private List<KeyRange> splits; private List<List<Scan>> scans; private boolean allowPageFilter; - + public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException { + this(context, statement, table, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter, null); + } + + private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException { super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory != null ? parallelIteratorFactory : - buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter)); + buildResultIteratorFactory(context, table, orderBy, limit, allowPageFilter), dynamicFilter); this.allowPageFilter = allowPageFilter; if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index 1bbda07..297b6cc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -83,6 +83,7 @@ public class SortMergeJoinPlan implements QueryPlan { private final KeyValueSchema rhsSchema; private final int rhsFieldPosition; private final boolean isSingleValueOnly; + private final int thresholdBytes; public SortMergeJoinPlan(StatementContext context, FilterableStatement statement, TableRef table, JoinType type, QueryPlan lhsPlan, QueryPlan rhsPlan, List<Expression> lhsKeyExpressions, List<Expression> rhsKeyExpressions, @@ -101,6 +102,8 @@ public class SortMergeJoinPlan implements QueryPlan { this.rhsSchema = buildSchema(rhsTable); this.rhsFieldPosition = rhsFieldPosition; this.isSingleValueOnly = isSingleValueOnly; + this.thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( + QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); } private static KeyValueSchema buildSchema(PTable table) { @@ -244,8 +247,6 @@ public class SortMergeJoinPlan implements QueryPlan { int len = lhsBitSet.getEstimatedLength(); this.emptyProjectedValue = new byte[len]; lhsBitSet.toBytes(emptyProjectedValue, 0); - int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( - QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); this.queue = new MappedByteBufferTupleQueue(thresholdBytes); this.queueIterator = null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java index c4a6b20..c8fef3f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java @@ -27,6 +27,7 @@ import org.apache.phoenix.expression.BaseSingleExpression; import org.apache.phoenix.expression.BaseTerminalExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.visitor.ExpressionVisitor; +import org.apache.phoenix.iterate.DefaultParallelScanGrouper; import org.apache.phoenix.iterate.DelegateResultIterator; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; @@ -47,7 +48,7 @@ public class UnnestArrayPlan extends DelegateQueryPlan { @Override public ResultIterator iterator() throws SQLException { - return new UnnestArrayResultIterator(delegate.iterator()); + return iterator(DefaultParallelScanGrouper.getInstance()); } @Override @@ -61,6 +62,11 @@ public class UnnestArrayPlan extends DelegateQueryPlan { planSteps.add("UNNEST"); return new ExplainPlan(planSteps); } + + @Override + public Integer getLimit() { + return null; + } public class UnnestArrayResultIterator extends DelegateResultIterator { private final UnnestArrayElemRefExpression elemRefExpression; http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java new file mode 100644 index 0000000..7ba43c7 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/CorrelateVariableFieldAccessExpression.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.expression; + +import java.io.DataOutput; +import java.io.IOException; +import java.sql.SQLException; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.execute.RuntimeContext; +import org.apache.phoenix.expression.visitor.ExpressionVisitor; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PDataType; + +public class CorrelateVariableFieldAccessExpression extends BaseTerminalExpression { + private final RuntimeContext runtimeContext; + private final String variableId; + private final Expression fieldAccessExpression; + + public CorrelateVariableFieldAccessExpression(RuntimeContext context, String variableId, Expression fieldAccessExpression) { + super(); + this.runtimeContext = context; + this.variableId = variableId; + this.fieldAccessExpression = fieldAccessExpression; + } + + @Override + public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { + Tuple variable = runtimeContext.getCorrelateVariableValue(variableId); + if (variable == null) + throw new RuntimeException("Variable '" + variableId + "' not set."); + + return fieldAccessExpression.evaluate(variable, ptr); + } + + @Override + public <T> T accept(ExpressionVisitor<T> visitor) { + return visitor.visit(this); + } + + @Override + public void write(DataOutput output) throws IOException { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + boolean success = evaluate(null, ptr); + Object value = success ? getDataType().toObject(ptr) : null; + try { + LiteralExpression expr = LiteralExpression.newConstant(value, getDataType()); + expr.write(output); + } catch (SQLException e) { + throw new IOException(e); + } + } + + @SuppressWarnings("rawtypes") + @Override + public PDataType getDataType() { + return this.fieldAccessExpression.getDataType(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java index 07f3bdd..f98c787 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java @@ -285,7 +285,7 @@ public enum ExpressionType { * Return the ExpressionType for a given Expression instance */ public static ExpressionType valueOf(Expression expression) { - ExpressionType type = classToEnumMap.get(expression.getClass()); + ExpressionType type = valueOfOrNull(expression); if (type == null) { // FIXME: this exception gets swallowed and retries happen throw new IllegalArgumentException("No ExpressionType for " + expression.getClass()); } @@ -297,7 +297,14 @@ public enum ExpressionType { * or null if none exists. */ public static ExpressionType valueOfOrNull(Expression expression) { - return classToEnumMap.get(expression.getClass()); + Class <? extends Expression> clazz = expression.getClass(); + // We will not have CorrelateVariableFieldAccessExpression on the server side, + // it will be evaluated at client side and will be serialized as + // LiteralExpression instead. + if (clazz == CorrelateVariableFieldAccessExpression.class) { + clazz = LiteralExpression.class; + } + return classToEnumMap.get(clazz); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java.orig ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java.orig b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java.orig deleted file mode 100644 index d28c1f9..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java.orig +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.expression; - -import java.util.Map; - -import org.apache.phoenix.expression.function.AbsFunction; -import org.apache.phoenix.expression.function.ArrayAllComparisonExpression; -import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression; -import org.apache.phoenix.expression.function.ArrayAppendFunction; -import org.apache.phoenix.expression.function.ArrayElemRefExpression; -import org.apache.phoenix.expression.function.ArrayIndexFunction; -import org.apache.phoenix.expression.function.ArrayLengthFunction; -import org.apache.phoenix.expression.function.ArrayPrependFunction; -import org.apache.phoenix.expression.function.ByteBasedRegexpReplaceFunction; -import org.apache.phoenix.expression.function.ByteBasedRegexpSplitFunction; -import org.apache.phoenix.expression.function.ByteBasedRegexpSubstrFunction; -import org.apache.phoenix.expression.function.CbrtFunction; -import org.apache.phoenix.expression.function.CeilDateExpression; -import org.apache.phoenix.expression.function.CeilDecimalExpression; -import org.apache.phoenix.expression.function.CeilFunction; -import org.apache.phoenix.expression.function.CeilTimestampExpression; -import org.apache.phoenix.expression.function.CoalesceFunction; -import org.apache.phoenix.expression.function.ConvertTimezoneFunction; -import org.apache.phoenix.expression.function.CountAggregateFunction; -import org.apache.phoenix.expression.function.DayOfMonthFunction; -import org.apache.phoenix.expression.function.DecodeFunction; -import org.apache.phoenix.expression.function.DistinctCountAggregateFunction; -import org.apache.phoenix.expression.function.DistinctValueAggregateFunction; -import org.apache.phoenix.expression.function.EncodeFunction; -import org.apache.phoenix.expression.function.ExpFunction; -import org.apache.phoenix.expression.function.ExternalSqlTypeIdFunction; -import org.apache.phoenix.expression.function.FirstValueFunction; -import org.apache.phoenix.expression.function.FloorDateExpression; -import org.apache.phoenix.expression.function.FloorDecimalExpression; -import org.apache.phoenix.expression.function.FloorFunction; -import org.apache.phoenix.expression.function.HourFunction; -import org.apache.phoenix.expression.function.IndexStateNameFunction; -import org.apache.phoenix.expression.function.InstrFunction; -import org.apache.phoenix.expression.function.InvertFunction; -import org.apache.phoenix.expression.function.LTrimFunction; -import org.apache.phoenix.expression.function.LastValueFunction; -import org.apache.phoenix.expression.function.LengthFunction; -import org.apache.phoenix.expression.function.LnFunction; -import org.apache.phoenix.expression.function.LogFunction; -import org.apache.phoenix.expression.function.LowerFunction; -import org.apache.phoenix.expression.function.LpadFunction; -import org.apache.phoenix.expression.function.MD5Function; -import org.apache.phoenix.expression.function.MaxAggregateFunction; -import org.apache.phoenix.expression.function.MinAggregateFunction; -import org.apache.phoenix.expression.function.MinuteFunction; -import org.apache.phoenix.expression.function.MonthFunction; -import org.apache.phoenix.expression.function.NowFunction; -import org.apache.phoenix.expression.function.NthValueFunction; -import org.apache.phoenix.expression.function.PercentRankAggregateFunction; -import org.apache.phoenix.expression.function.PercentileContAggregateFunction; -import org.apache.phoenix.expression.function.PercentileDiscAggregateFunction; -import org.apache.phoenix.expression.function.PowerFunction; -import org.apache.phoenix.expression.function.RTrimFunction; -import org.apache.phoenix.expression.function.RandomFunction; -import org.apache.phoenix.expression.function.RegexpReplaceFunction; -import org.apache.phoenix.expression.function.RegexpSplitFunction; -import org.apache.phoenix.expression.function.RegexpSubstrFunction; -import org.apache.phoenix.expression.function.ReverseFunction; -import org.apache.phoenix.expression.function.RoundDateExpression; -import org.apache.phoenix.expression.function.RoundDecimalExpression; -import org.apache.phoenix.expression.function.RoundFunction; -import org.apache.phoenix.expression.function.RoundTimestampExpression; -import org.apache.phoenix.expression.function.SQLIndexTypeFunction; -import org.apache.phoenix.expression.function.SQLTableTypeFunction; -import org.apache.phoenix.expression.function.SQLViewTypeFunction; -import org.apache.phoenix.expression.function.SecondFunction; -import org.apache.phoenix.expression.function.SignFunction; -import org.apache.phoenix.expression.function.SqlTypeNameFunction; -import org.apache.phoenix.expression.function.SqrtFunction; -import org.apache.phoenix.expression.function.StddevPopFunction; -import org.apache.phoenix.expression.function.StddevSampFunction; -import org.apache.phoenix.expression.function.StringBasedRegexpReplaceFunction; -import org.apache.phoenix.expression.function.StringBasedRegexpSplitFunction; -import org.apache.phoenix.expression.function.StringBasedRegexpSubstrFunction; -import org.apache.phoenix.expression.function.SubstrFunction; -import org.apache.phoenix.expression.function.SumAggregateFunction; -import org.apache.phoenix.expression.function.TimezoneOffsetFunction; -import org.apache.phoenix.expression.function.ToCharFunction; -import org.apache.phoenix.expression.function.ToDateFunction; -import org.apache.phoenix.expression.function.ToNumberFunction; -import org.apache.phoenix.expression.function.ToTimeFunction; -import org.apache.phoenix.expression.function.ToTimestampFunction; -import org.apache.phoenix.expression.function.TrimFunction; -import org.apache.phoenix.expression.function.TruncFunction; -import org.apache.phoenix.expression.function.UDFExpression; -import org.apache.phoenix.expression.function.UpperFunction; -import org.apache.phoenix.expression.function.WeekFunction; -import org.apache.phoenix.expression.function.YearFunction; - -import com.google.common.collect.Maps; - -/** - * - * Enumeration of all Expression types that will be looked up. They may be evaluated on the server-side. - * Used during serialization and deserialization to pass Expression between client - * and server. - * - * - * @since 0.1 - */ -//Important : When you want to add new Types make sure to add those towards the end, not changing the existing type's -//ordinal -public enum ExpressionType { - ReverseFunction(ReverseFunction.class), - RowKey(RowKeyColumnExpression.class), - KeyValue(KeyValueColumnExpression.class), - LiteralValue(LiteralExpression.class), - RoundFunction(RoundFunction.class), - FloorFunction(FloorFunction.class), - CeilFunction(CeilFunction.class), - RoundDateExpression(RoundDateExpression.class), - FloorDateExpression(FloorDateExpression.class), - CeilDateExpression(CeilDateExpression.class), - RoundTimestampExpression(RoundTimestampExpression.class), - CeilTimestampExpression(CeilTimestampExpression.class), - RoundDecimalExpression(RoundDecimalExpression.class), - FloorDecimalExpression(FloorDecimalExpression.class), - CeilDecimalExpression(CeilDecimalExpression.class), - TruncFunction(TruncFunction.class), - ToDateFunction(ToDateFunction.class), - ToCharFunction(ToCharFunction.class), - ToNumberFunction(ToNumberFunction.class), - CoerceFunction(CoerceExpression.class), - SubstrFunction(SubstrFunction.class), - AndExpression(AndExpression.class), - OrExpression(OrExpression.class), - ComparisonExpression(ComparisonExpression.class), - CountAggregateFunction(CountAggregateFunction.class), - SumAggregateFunction(SumAggregateFunction.class), - MinAggregateFunction(MinAggregateFunction.class), - MaxAggregateFunction(MaxAggregateFunction.class), - StringBasedLikeExpression(StringBasedLikeExpression.class), - NotExpression(NotExpression.class), - CaseExpression(CaseExpression.class), - InListExpression(InListExpression.class), - IsNullExpression(IsNullExpression.class), - LongSubtractExpression(LongSubtractExpression.class), - DateSubtractExpression(DateSubtractExpression.class), - DecimalSubtractExpression(DecimalSubtractExpression.class), - LongAddExpression(LongAddExpression.class), - DecimalAddExpression(DecimalAddExpression.class), - DateAddExpression(DateAddExpression.class), - LongMultiplyExpression(LongMultiplyExpression.class), - DecimalMultiplyExpression(DecimalMultiplyExpression.class), - LongDivideExpression(LongDivideExpression.class), - DecimalDivideExpression(DecimalDivideExpression.class), - CoalesceFunction(CoalesceFunction.class), - StringBasedRegexpReplaceFunction(StringBasedRegexpReplaceFunction.class), - SQLTypeNameFunction(SqlTypeNameFunction.class), - StringBasedRegexpSubstrFunction(StringBasedRegexpSubstrFunction.class), - StringConcatExpression(StringConcatExpression.class), - LengthFunction(LengthFunction.class), - LTrimFunction(LTrimFunction.class), - RTrimFunction(RTrimFunction.class), - UpperFunction(UpperFunction.class), - LowerFunction(LowerFunction.class), - TrimFunction(TrimFunction.class), - DistinctCountAggregateFunction(DistinctCountAggregateFunction.class), - PercentileContAggregateFunction(PercentileContAggregateFunction.class), - PercentRankAggregateFunction(PercentRankAggregateFunction.class), - StddevPopFunction(StddevPopFunction.class), - StddevSampFunction(StddevSampFunction.class), - PercentileDiscAggregateFunction(PercentileDiscAggregateFunction.class), - DoubleAddExpression(DoubleAddExpression.class), - DoubleSubtractExpression(DoubleSubtractExpression.class), - DoubleMultiplyExpression(DoubleMultiplyExpression.class), - DoubleDivideExpression(DoubleDivideExpression.class), - RowValueConstructorExpression(RowValueConstructorExpression.class), - MD5Function(MD5Function.class), - SQLTableTypeFunction(SQLTableTypeFunction.class), - IndexStateName(IndexStateNameFunction.class), - InvertFunction(InvertFunction.class), - ProjectedColumnExpression(ProjectedColumnExpression.class), - TimestampAddExpression(TimestampAddExpression.class), - TimestampSubtractExpression(TimestampSubtractExpression.class), - ArrayIndexFunction(ArrayIndexFunction.class), - ArrayLengthFunction(ArrayLengthFunction.class), - ArrayConstructorExpression(ArrayConstructorExpression.class), - SQLViewTypeFunction(SQLViewTypeFunction.class), - ExternalSqlTypeIdFunction(ExternalSqlTypeIdFunction.class), - ConvertTimezoneFunction(ConvertTimezoneFunction.class), - DecodeFunction(DecodeFunction.class), - TimezoneOffsetFunction(TimezoneOffsetFunction.class), - EncodeFunction(EncodeFunction.class), - LpadFunction(LpadFunction.class), - NthValueFunction(NthValueFunction.class), - FirstValueFunction(FirstValueFunction.class), - LastValueFunction(LastValueFunction.class), - ArrayAnyComparisonExpression(ArrayAnyComparisonExpression.class), - ArrayAllComparisonExpression(ArrayAllComparisonExpression.class), - InlineArrayElemRefExpression(ArrayElemRefExpression.class), - SQLIndexTypeFunction(SQLIndexTypeFunction.class), - ModulusExpression(ModulusExpression.class), - DistinctValueAggregateFunction(DistinctValueAggregateFunction.class), - StringBasedRegexpSplitFunction(StringBasedRegexpSplitFunction.class), - RandomFunction(RandomFunction.class), - ToTimeFunction(ToTimeFunction.class), - ToTimestampFunction(ToTimestampFunction.class), - ByteBasedLikeExpression(ByteBasedLikeExpression.class), - ByteBasedRegexpReplaceFunction(ByteBasedRegexpReplaceFunction.class), - ByteBasedRegexpSubstrFunction(ByteBasedRegexpSubstrFunction.class), - ByteBasedRegexpSplitFunction(ByteBasedRegexpSplitFunction.class), - LikeExpression(LikeExpression.class), - RegexpReplaceFunction(RegexpReplaceFunction.class), - RegexpSubstrFunction(RegexpSubstrFunction.class), - RegexpSplitFunction(RegexpSplitFunction.class), - SignFunction(SignFunction.class), - YearFunction(YearFunction.class), - MonthFunction(MonthFunction.class), - SecondFunction(SecondFunction.class), - WeekFunction(WeekFunction.class), - HourFunction(HourFunction.class), - NowFunction(NowFunction.class), - InstrFunction(InstrFunction.class), - MinuteFunction(MinuteFunction.class), - DayOfMonthFunction(DayOfMonthFunction.class), - ArrayAppendFunction(ArrayAppendFunction.class), - UDFExpression(UDFExpression.class), - ArrayPrependFunction(ArrayPrependFunction.class), - SqrtFunction(SqrtFunction.class), - AbsFunction(AbsFunction.class), - CbrtFunction(CbrtFunction.class), - LnFunction(LnFunction.class), - LogFunction(LogFunction.class), - ExpFunction(ExpFunction.class), - PowerFunction(PowerFunction.class) - ; - - ExpressionType(Class<? extends Expression> clazz) { - this.clazz = clazz; - } - - public Class<? extends Expression> getExpressionClass() { - return clazz; - } - - private final Class<? extends Expression> clazz; - - private static final Map<Class<? extends Expression>,ExpressionType> classToEnumMap = Maps.newHashMapWithExpectedSize(3); - static { - for (ExpressionType type : ExpressionType.values()) { - classToEnumMap.put(type.clazz, type); - } - } - - /** - * Return the ExpressionType for a given Expression instance - */ - public static ExpressionType valueOf(Expression expression) { - ExpressionType type = classToEnumMap.get(expression.getClass()); - if (type == null) { // FIXME: this exception gets swallowed and retries happen - throw new IllegalArgumentException("No ExpressionType for " + expression.getClass()); - } - return type; - } - - /** - * Return the ExpressionType for a given Expression instance - * or null if none exists. - */ - public static ExpressionType valueOfOrNull(Expression expression) { - return classToEnumMap.get(expression.getClass()); - } - - /** - * Instantiates a DataAccessor based on its DataAccessorType - */ - public Expression newInstance() { - try { - return clazz.newInstance(); - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java index 18b8795..8d14545 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java @@ -26,6 +26,7 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; @@ -61,6 +62,11 @@ public abstract class CloneExpressionVisitor extends TraverseAllExpressionVisito } @Override + public Expression visit(CorrelateVariableFieldAccessExpression node) { + return node; + } + + @Override public Expression visit(LiteralExpression node) { return node; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java index 0a8d3ad..31f340d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java @@ -27,6 +27,7 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; @@ -108,6 +109,7 @@ public interface ExpressionVisitor<E> { public Iterator<Expression> visitEnter(ArrayConstructorExpression node); public E visitLeave(ArrayConstructorExpression node, List<E> l); + public E visit(CorrelateVariableFieldAccessExpression node); public E visit(LiteralExpression node); public E visit(RowKeyColumnExpression node); public E visit(KeyValueColumnExpression node); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java index e7e7c67..3b7067a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java @@ -26,7 +26,9 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; +import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; import org.apache.phoenix.expression.IsNullExpression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -99,6 +101,11 @@ public class StatelessTraverseAllExpressionVisitor<E> extends TraverseAllExpress } @Override + public E visit(CorrelateVariableFieldAccessExpression node) { + return null; + } + + @Override public E visit(LiteralExpression node) { return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java index 019754f..83b28bd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java @@ -26,7 +26,9 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; +import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; import org.apache.phoenix.expression.IsNullExpression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -99,6 +101,11 @@ public class StatelessTraverseNoExpressionVisitor<E> extends TraverseNoExpressio } @Override + public E visit(CorrelateVariableFieldAccessExpression node) { + return null; + } + + @Override public E visit(LiteralExpression node) { return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java index 797f3ce..e3d0987 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java @@ -39,7 +39,8 @@ public class UngroupedAggregatingResultIterator extends GroupedAggregatingResult Tuple result = super.next(); // Ensure ungrouped aggregregation always returns a row, even if the underlying iterator doesn't. if (result == null && !hasRows) { - // Generate value using unused ClientAggregators + // We should reset ClientAggregators here in case they are being reused in a new ResultIterator. + aggregators.reset(aggregators.getAggregators()); byte[] value = aggregators.toBytes(aggregators.getAggregators()); result = new SingleKeyValueTuple( KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java new file mode 100644 index 0000000..7ae3757 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.JoinCompiler; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.TupleProjectionCompiler; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.expression.ProjectedColumnExpression; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.JoinTableNode.JoinType; +import org.apache.phoenix.parse.ParseNodeFactory; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.schema.ColumnRef; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class CorrelatePlanTest { + + private static final StatementContext CONTEXT; + static { + try { + PhoenixConnection connection = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS).unwrap(PhoenixConnection.class); + PhoenixStatement stmt = new PhoenixStatement(connection); + ColumnResolver resolver = FromCompiler.getResolverForQuery(SelectStatement.SELECT_ONE, connection); + CONTEXT = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private static final Object[][] LEFT_RELATION = new Object[][] { + {1, "1"}, + {2, "2"}, + {3, "3"}, + {4, "4"}, + {5, "5"}, + }; + + private static final Object[][] RIGHT_RELATION = new Object[][] { + {"2", 20}, + {"2", 40}, + {"5", 50}, + {"6", 60}, + {"5", 100}, + {"1", 10}, + {"3", 30}, + }; + + @Test + public void testCorrelatePlanWithInnerJoinType() throws SQLException { + Object[][] expected = new Object[][] { + {1, "1", "1", 10}, + {2, "2", "2", 20}, + {2, "2", "2", 40}, + {3, "3", "3", 30}, + {5, "5", "5", 50}, + {5, "5", "5", 100}, + }; + testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Inner, expected); + } + + @Test + public void testCorrelatePlanWithLeftJoinType() throws SQLException { + Object[][] expected = new Object[][] { + {1, "1", "1", 10}, + {2, "2", "2", 20}, + {2, "2", "2", 40}, + {3, "3", "3", 30}, + {4, "4", null, null}, + {5, "5", "5", 50}, + {5, "5", "5", 100}, + }; + testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Left, expected); + } + + @Test + public void testCorrelatePlanWithSemiJoinType() throws SQLException { + Object[][] expected = new Object[][] { + {1, "1"}, + {2, "2"}, + {3, "3"}, + {5, "5"}, + }; + testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Semi, expected); + } + + @Test + public void testCorrelatePlanWithAntiJoinType() throws SQLException { + Object[][] expected = new Object[][] { + {4, "4"}, + }; + testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Anti, expected); + } + + @Test + public void testCorrelatePlanWithSingleValueOnly() throws SQLException { + Object[][] expected = new Object[][] { + {1, "1", "1", 10}, + {2, "2", "2", 20}, + {2, "2", "2", 40}, + }; + try { + testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Inner, expected); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS.getErrorCode(), e.getErrorCode()); + } + + Object[][] rightRelation = new Object[][] { + {"2", 20}, + {"6", 60}, + {"5", 100}, + {"1", 10}, + }; + expected = new Object[][] { + {1, "1", "1", 10}, + {2, "2", "2", 20}, + {5, "5", "5", 100}, + }; + testCorrelatePlan(LEFT_RELATION, rightRelation, 1, 0, JoinType.Inner, expected); + } + + private void testCorrelatePlan(Object[][] leftRelation, Object[][] rightRelation, int leftCorrelColumn, int rightCorrelColumn, JoinType type, Object[][] expectedResult) throws SQLException { + TableRef leftTable = createProjectedTableFromLiterals(leftRelation[0]); + TableRef rightTable = createProjectedTableFromLiterals(rightRelation[0]); + String varName = "$cor0"; + RuntimeContext runtimeContext = new RuntimeContextImpl(); + runtimeContext.defineCorrelateVariable(varName, leftTable); + QueryPlan leftPlan = newLiteralResultIterationPlan(leftRelation); + QueryPlan rightPlan = newLiteralResultIterationPlan(rightRelation); + Expression columnExpr = new ColumnRef(rightTable, rightCorrelColumn).newColumnExpression(); + Expression fieldAccess = new CorrelateVariableFieldAccessExpression(runtimeContext, varName, new ColumnRef(leftTable, leftCorrelColumn).newColumnExpression()); + Expression filter = ComparisonExpression.create(CompareOp.EQUAL, Arrays.asList(columnExpr, fieldAccess), CONTEXT.getTempPtr(), false); + rightPlan = new ClientScanPlan(CONTEXT, SelectStatement.SELECT_ONE, rightTable, RowProjector.EMPTY_PROJECTOR, null, filter, OrderBy.EMPTY_ORDER_BY, rightPlan); + PTable joinedTable = JoinCompiler.joinProjectedTables(leftTable.getTable(), rightTable.getTable(), type); + CorrelatePlan correlatePlan = new CorrelatePlan(leftPlan, rightPlan, varName, type, false, runtimeContext, joinedTable, leftTable.getTable(), rightTable.getTable(), leftTable.getTable().getColumns().size()); + ResultIterator iter = correlatePlan.iterator(); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + for (Object[] row : expectedResult) { + Tuple next = iter.next(); + assertNotNull(next); + for (int i = 0; i < row.length; i++) { + PColumn column = joinedTable.getColumns().get(i); + boolean eval = new ProjectedColumnExpression(column, joinedTable, column.getName().getString()).evaluate(next, ptr); + Object o = eval ? column.getDataType().toObject(ptr) : null; + assertEquals(row[i], o); + } + } + } + + private QueryPlan newLiteralResultIterationPlan(Object[][] rows) { + List<Tuple> tuples = Lists.newArrayList(); + Tuple baseTuple = new SingleKeyValueTuple(KeyValue.LOWESTKEY); + for (Object[] row : rows) { + Expression[] exprs = new Expression[row.length]; + for (int i = 0; i < row.length; i++) { + exprs[i] = LiteralExpression.newConstant(row[i]); + } + TupleProjector projector = new TupleProjector(exprs); + tuples.add(projector.projectResults(baseTuple)); + } + + return new LiteralResultIterationPlan(tuples, CONTEXT, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null); + } + + + private TableRef createProjectedTableFromLiterals(Object[] row) { + List<PColumn> columns = Lists.<PColumn>newArrayList(); + for (int i = 0; i < row.length; i++) { + String name = ParseNodeFactory.createTempAlias(); + Expression expr = LiteralExpression.newConstant(row[i]); + columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), + expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(), + i, expr.getSortOrder(), null, null, false, name)); + } + try { + PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, + PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, + null, null, columns, null, null, Collections.<PTable>emptyList(), + false, Collections.<PName>emptyList(), null, null, false, false, false, null, + null, null, true); + TableRef sourceTable = new TableRef(pTable); + List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); + for (PColumn column : sourceTable.getTable().getColumns()) { + sourceColumnRefs.add(new ColumnRef(sourceTable, column.getPosition())); + } + + return new TableRef(TupleProjectionCompiler.createProjectedTable(sourceTable, sourceColumnRefs, false)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8f3cd25e/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java index 0def172..896f920 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java @@ -67,13 +67,13 @@ import com.google.common.collect.Lists; @SuppressWarnings("rawtypes") public class UnnestArrayPlanTest { - private static final StatementContext context; + private static final StatementContext CONTEXT; static { try { PhoenixConnection connection = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS).unwrap(PhoenixConnection.class); PhoenixStatement stmt = new PhoenixStatement(connection); ColumnResolver resolver = FromCompiler.getResolverForQuery(SelectStatement.SELECT_ONE, connection); - context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + CONTEXT = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); } catch (SQLException e) { throw new RuntimeException(e); } @@ -112,7 +112,7 @@ public class UnnestArrayPlanTest { private void testUnnestArrays(PArrayDataType arrayType, List<Object[]> arrays, boolean withOrdinality) throws Exception { PDataType baseType = PDataType.fromTypeId(arrayType.getSqlType() - PDataType.ARRAY_TYPE_BASE); List<Tuple> tuples = toTuples(arrayType, arrays); - LiteralResultIterationPlan subPlan = new LiteralResultIterationPlan(tuples.iterator(), context, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null); + LiteralResultIterationPlan subPlan = new LiteralResultIterationPlan(tuples, CONTEXT, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null); LiteralExpression dummy = LiteralExpression.newConstant(null, arrayType); RowKeyValueAccessor accessor = new RowKeyValueAccessor(Arrays.asList(dummy), 0); UnnestArrayPlan plan = new UnnestArrayPlan(subPlan, new RowKeyColumnExpression(dummy, accessor), withOrdinality);