Repository: phoenix Updated Branches: refs/heads/4.x-cdh5.11.2 adcee3f0f -> 6b693e93f
PHOENIX-3941 Filter regions to scan for local indexes based on data table leading pk filter conditions Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6b693e93 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6b693e93 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6b693e93 Branch: refs/heads/4.x-cdh5.11.2 Commit: 6b693e93fc957dc306da1664dcc5041ba9502322 Parents: adcee3f Author: James Taylor <jtay...@salesforce.com> Authored: Wed Feb 7 23:02:44 2018 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Wed Feb 7 23:13:53 2018 -0800 ---------------------------------------------------------------------- .../apache/phoenix/compile/DeleteCompiler.java | 2 +- .../org/apache/phoenix/compile/ExplainPlan.java | 10 + .../apache/phoenix/compile/JoinCompiler.java | 10 +- .../apache/phoenix/compile/PostDDLCompiler.java | 2 +- .../apache/phoenix/compile/QueryCompiler.java | 20 +- .../org/apache/phoenix/compile/ScanRanges.java | 12 +- .../apache/phoenix/compile/UpsertCompiler.java | 4 +- .../apache/phoenix/execute/AggregatePlan.java | 12 +- .../apache/phoenix/execute/BaseQueryPlan.java | 6 +- .../execute/LiteralResultIterationPlan.java | 2 +- .../org/apache/phoenix/execute/ScanPlan.java | 18 +- .../phoenix/iterate/BaseResultIterators.java | 226 ++++++++++++++++++- .../apache/phoenix/iterate/ExplainTable.java | 3 +- .../phoenix/iterate/ParallelIterators.java | 8 +- .../apache/phoenix/iterate/SerialIterators.java | 4 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 2 +- .../apache/phoenix/optimize/QueryOptimizer.java | 4 +- .../query/ConnectionlessQueryServicesImpl.java | 8 +- .../phoenix/compile/QueryCompilerTest.java | 226 +++++++++++++++++++ .../apache/phoenix/query/KeyRangeClipTest.java | 155 +++++++++++++ .../query/ParallelIteratorsSplitTest.java | 2 +- 21 files changed, 674 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index fd80238..54e63d2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -595,7 +595,7 @@ public class DeleteCompiler { } final RowProjector projector = projectorToBe; final QueryPlan aggPlan = new AggregatePlan(context, select, dataPlan.getTableRef(), projector, null, null, - OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); + OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, dataPlan); return new ServerSelectDeleteMutationPlan(dataPlan, connection, aggPlan, projector, maxSize, maxSizeBytes); } else { final DeletingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java index 2bc7809..ef34daa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java @@ -34,4 +34,14 @@ public class ExplainPlan { public List<String> getPlanSteps() { return planSteps; } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + for (String step : planSteps) { + buf.append(step); + buf.append('\n'); + } + return buf.toString(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index 439a79b..f3c4c24 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -17,8 +17,8 @@ */ package org.apache.phoenix.compile; -import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; import java.sql.SQLException; import java.util.ArrayList; @@ -80,8 +80,6 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; -import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; -import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ProjectedColumn; @@ -1182,7 +1180,7 @@ public class JoinCompiler { } JoinTable join = compile(statement, select, resolver); if (groupByTableRef != null || orderByTableRef != null) { - QueryCompiler compiler = new QueryCompiler(statement, select, resolver, false); + QueryCompiler compiler = new QueryCompiler(statement, select, resolver, false, null); List<Object> binds = statement.getParameters(); StatementContext ctx = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement)); QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false, false, null); @@ -1204,6 +1202,10 @@ public class JoinCompiler { List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? select.getGroupBy() : null; List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? select.getOrderBy() : null; SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), table.getTableSamplingRate(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence(), select.getUdfParseNodes()); + // TODO: As port of PHOENIX-4585, we need to make sure this plan has a pointer to the data plan + // when an index is used instead of the data table, and that this method returns that + // state for downstream processing. + // TODO: It seems inefficient to be recompiling the statement again and again inside of this optimize call QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, stmt); if (!plan.getTableRef().equals(tableRef)) { replacement.put(tableRef, plan.getTableRef()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index e5ed6a5..709534e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@ -283,7 +283,7 @@ public class PostDDLCompiler { continue; } QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, null, - OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); + OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, null); try { ResultIterator iterator = plan.iterator(); try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 287f9e0..3b14850 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -104,16 +104,13 @@ public class QueryCompiler { private final boolean projectTuples; private final boolean useSortMergeJoin; private final boolean noChildParentJoinOptimization; + private final QueryPlan dataPlan; - public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException { - this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), true); + public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples, QueryPlan dataPlan) throws SQLException { + this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, dataPlan); } - public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples) throws SQLException { - this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples); - } - - public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples) throws SQLException { + public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples, QueryPlan dataPlan) throws SQLException { this.statement = statement; this.select = select; this.resolver = resolver; @@ -133,10 +130,11 @@ public class QueryCompiler { scan.setCaching(statement.getFetchSize()); this.originalScan = ScanUtil.newScan(scan); + this.dataPlan = dataPlan; } public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager) throws SQLException { - this(statement, select, resolver, targetColumns, parallelIteratorFactory, sequenceManager, true); + this(statement, select, resolver, targetColumns, parallelIteratorFactory, sequenceManager, true, null); } /** @@ -495,7 +493,7 @@ public class QueryCompiler { } int maxRows = this.statement.getMaxRows(); this.statement.setMaxRows(pushDownMaxRows ? maxRows : 0); // overwrite maxRows to avoid its impact on inner queries. - QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false).compile(); + QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false, dataPlan).compile(); plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan); this.statement.setMaxRows(maxRows); // restore maxRows. return plan; @@ -586,9 +584,9 @@ public class QueryCompiler { parallelIteratorFactory) : (select.isAggregate() || select.isDistinct() ? new AggregatePlan(context, select, tableRef, projector, limit, offset, orderBy, - parallelIteratorFactory, groupBy, having) + parallelIteratorFactory, groupBy, having, dataPlan) : new ScanPlan(context, select, tableRef, projector, limit, offset, orderBy, - parallelIteratorFactory, allowPageFilter)); + parallelIteratorFactory, allowPageFilter, dataPlan)); } if (!subqueries.isEmpty()) { int count = subqueries.size(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java index 18e575c..8c71248 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java @@ -147,8 +147,10 @@ public class ScanRanges { scanRange = KeyRange.getKeyRange(minKey, maxKey); } if (minMaxRange != KeyRange.EVERYTHING_RANGE) { - minMaxRange = ScanUtil.convertToInclusiveExclusiveRange(minMaxRange, schema, new ImmutableBytesWritable()); - scanRange = scanRange.intersect(minMaxRange); + // Intersect using modified min/max range, but keep original range to ensure it + // can still be decomposed into it's parts + KeyRange inclusiveExclusiveMinMaxRange = ScanUtil.convertToInclusiveExclusiveRange(minMaxRange, schema, new ImmutableBytesWritable()); + scanRange = scanRange.intersect(inclusiveExclusiveMinMaxRange); } if (scanRange == KeyRange.EMPTY_RANGE) { @@ -573,7 +575,7 @@ public class ScanRanges { } public int getBoundPkColumnCount() { - return this.useSkipScanFilter ? ScanUtil.getRowKeyPosition(slotSpan, ranges.size()) : Math.max(getBoundPkSpan(ranges, slotSpan), getBoundMinMaxSlotCount()); + return Math.max(getBoundPkSpan(ranges, slotSpan), getBoundMinMaxSlotCount()); } private int getBoundMinMaxSlotCount() { @@ -625,6 +627,10 @@ public class ScanRanges { public int[] getSlotSpans() { return slotSpan; } + + public KeyRange getScanRange() { + return scanRange; + } public boolean hasEqualityConstraint(int pkPosition) { int pkOffset = 0; http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index a81a427..08133a4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -549,7 +549,7 @@ public class UpsertCompiler { select = SelectStatement.create(select, hint); // Pass scan through if same table in upsert and select so that projection is computed correctly // Use optimizer to choose the best plan - QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false); + QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false, null); queryPlanToBe = compiler.compile(); // This is post-fix: if the tableRef is a projected table, this means there are post-processing // steps and parallelIteratorFactory did not take effect. @@ -697,7 +697,7 @@ public class UpsertCompiler { scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions)); // Ignore order by - it has no impact - final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); + final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, originalQueryPlan); return new ServerUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, context, connection, scan, aggPlan, aggProjector, maxSize, maxSizeBytes); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 369769e..2e042e7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -90,17 +90,17 @@ public class AggregatePlan extends BaseQueryPlan { public AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, - ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, Expression having) throws SQLException { + ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, Expression having, QueryPlan dataPlan) throws SQLException { this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, groupBy, having, - null); + null, dataPlan); } private AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, Expression having, - Expression dynamicFilter) throws SQLException { + Expression dynamicFilter, QueryPlan dataPlan) throws SQLException { super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, offset, - orderBy, groupBy, parallelIteratorFactory, dynamicFilter); + orderBy, groupBy, parallelIteratorFactory, dynamicFilter, dataPlan); this.having = having; this.aggregators = context.getAggregationManager().getAggregators(); boolean hasSerialHint = statement.getHint().hasHint(HintNode.Hint.SERIAL); @@ -251,8 +251,8 @@ public class AggregatePlan extends BaseQueryPlan { } } BaseResultIterators iterators = isSerial - ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan, caches) - : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false, caches); + ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan, caches, dataPlan) + : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false, caches, dataPlan); estimatedRows = iterators.getEstimatedRowCount(); estimatedSize = iterators.getEstimatedByteCount(); estimateInfoTimestamp = iterators.getEstimateInfoTimestamp(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index 380037f..0bc606e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -63,8 +63,6 @@ import org.apache.phoenix.parse.HintNode.Hint; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.query.QueryConstants; -import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; @@ -114,6 +112,7 @@ public abstract class BaseQueryPlan implements QueryPlan { * immediately before creating the ResultIterator. */ protected final Expression dynamicFilter; + protected final QueryPlan dataPlan; protected Long estimatedRows; protected Long estimatedSize; protected Long estimateInfoTimestamp; @@ -124,7 +123,7 @@ public abstract class BaseQueryPlan implements QueryPlan { StatementContext context, FilterableStatement statement, TableRef table, RowProjector projection, ParameterMetaData paramMetaData, Integer limit, Integer offset, OrderBy orderBy, GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory, - Expression dynamicFilter) { + Expression dynamicFilter, QueryPlan dataPlan) { this.context = context; this.statement = statement; this.tableRef = table; @@ -137,6 +136,7 @@ public abstract class BaseQueryPlan implements QueryPlan { this.groupBy = groupBy; this.parallelIteratorFactory = parallelIteratorFactory; this.dynamicFilter = dynamicFilter; + this.dataPlan = dataPlan; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java index 1d1332d..c9abb69 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java @@ -56,7 +56,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan { public LiteralResultIterationPlan(Iterable<Tuple> tuples, StatementContext context, FilterableStatement statement, TableRef tableRef, RowProjector projection, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory) throws SQLException { - super(context, statement, tableRef, projection, context.getBindManager().getParameterMetaData(), limit, offset, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null); + super(context, statement, tableRef, projection, context.getBindManager().getParameterMetaData(), limit, offset, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null, null); this.tuples = tuples; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 31d7097..d63950c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; @@ -93,14 +94,17 @@ public class ScanPlan extends BaseQueryPlan { private Long serialBytesEstimate; private Long serialEstimateInfoTs; - public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException { - this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null); + public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, + Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, + QueryPlan dataPlan) throws SQLException { + this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null, dataPlan); } - private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException { + private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, + OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter, QueryPlan dataPlan) throws SQLException { super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit,offset, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory != null ? parallelIteratorFactory : - buildResultIteratorFactory(context, statement, table, orderBy, limit, offset, allowPageFilter), dynamicFilter); + buildResultIteratorFactory(context, statement, table, orderBy, limit, offset, allowPageFilter), dynamicFilter, dataPlan); this.allowPageFilter = allowPageFilter; boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); if (isOrdered) { // TopN @@ -260,11 +264,11 @@ public class ScanPlan extends BaseQueryPlan { && isDataToScanWithinThreshold; BaseResultIterators iterators; if (isOffsetOnServer) { - iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan, caches); + iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan, caches, dataPlan); } else if (isSerial) { - iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan, caches); + iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan, caches, dataPlan); } else { - iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly, caches); + iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly, caches, dataPlan); } estimatedRows = iterators.getEstimatedRowCount(); estimatedSize = iterators.getEstimatedByteCount(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index bd67fa8..25722a9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -37,6 +37,7 @@ import java.io.DataInputStream; import java.io.EOFException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.Collections; import java.util.Iterator; @@ -93,6 +94,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; +import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; @@ -101,9 +103,11 @@ import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; import org.apache.phoenix.schema.stats.StatisticsUtil; @@ -157,6 +161,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private Scan scan; private final boolean useStatsForParallelization; protected Map<ImmutableBytesPtr,ServerCache> caches; + private final QueryPlan dataPlan; static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() { @Override @@ -473,13 +478,14 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } - public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) throws SQLException { + public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches, QueryPlan dataPlan) throws SQLException { super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint(), QueryUtil.getOffsetLimit(plan.getLimit(), plan.getOffset()), offset); this.plan = plan; this.scan = scan; this.caches = caches; this.scanGrouper = scanGrouper; + this.dataPlan = dataPlan; StatementContext context = plan.getContext(); // Clone MutationState as the one on the connection will change if auto commit is on // yet we need the original one with the original transaction from TableResultIterator. @@ -681,6 +687,173 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private long rowsEstimate; } + private int computeColumnsInCommon() { + PTable dataTable; + if ((dataTable=dataPlan.getTableRef().getTable()).getBucketNum() != null) { // unable to compute prefix range for salted data table + return 0; + } + + PTable table = getTable(); + int nColumnsOffset = dataTable.isMultiTenant() ? 1 :0; + int nColumnsInCommon = nColumnsOffset; + List<PColumn> dataPKColumns = dataTable.getPKColumns(); + List<PColumn> indexPKColumns = table.getPKColumns(); + int nIndexPKColumns = indexPKColumns.size(); + int nDataPKColumns = dataPKColumns.size(); + // Skip INDEX_ID and tenant ID columns + for (int i = 1 + nColumnsInCommon; i < nIndexPKColumns; i++) { + PColumn indexColumn = indexPKColumns.get(i); + String indexColumnName = indexColumn.getName().getString(); + String cf = IndexUtil.getDataColumnFamilyName(indexColumnName); + if (cf.length() != 0) { + break; + } + if (i > nDataPKColumns) { + break; + } + PColumn dataColumn = dataPKColumns.get(i-1); + String dataColumnName = dataColumn.getName().getString(); + // Ensure both name and type are the same. Because of the restrictions we have + // on PK column types (namely that you can only have a fixed width nullable + // column as your last column), the type check is more of a sanity check + // since it wouldn't make sense to have an index with every column in common. + if (indexColumn.getDataType() == dataColumn.getDataType() + && dataColumnName.equals(IndexUtil.getDataColumnName(indexColumnName))) { + nColumnsInCommon++; + continue; + } + break; + } + return nColumnsInCommon; + } + + // public for testing + public static ScanRanges computePrefixScanRanges(ScanRanges dataScanRanges, int nColumnsInCommon) { + if (nColumnsInCommon == 0) { + return ScanRanges.EVERYTHING; + } + + int offset = 0; + List<List<KeyRange>> cnf = Lists.newArrayListWithExpectedSize(nColumnsInCommon); + int[] slotSpan = new int[nColumnsInCommon]; + boolean useSkipScan = false; + boolean hasRange = false; + List<List<KeyRange>> rangesList = dataScanRanges.getRanges(); + int rangesListSize = rangesList.size(); + while (offset < nColumnsInCommon && offset < rangesListSize) { + List<KeyRange> ranges = rangesList.get(offset); + // We use a skip scan if we have multiple ranges or if + // we have a non single key range before the last range. + useSkipScan |= ranges.size() > 1 || hasRange; + cnf.add(ranges); + int rangeSpan = 1 + dataScanRanges.getSlotSpans()[offset]; + if (offset + rangeSpan > nColumnsInCommon) { + rangeSpan = nColumnsInCommon - offset; + // trim range to only be rangeSpan in length + ranges = Lists.newArrayListWithExpectedSize(cnf.get(cnf.size()-1).size()); + for (KeyRange range : cnf.get(cnf.size()-1)) { + range = clipRange(dataScanRanges.getSchema(), offset, rangeSpan, range); + // trim range to be only rangeSpan in length + ranges.add(range); + } + cnf.set(cnf.size()-1, ranges); + } + for (KeyRange range : ranges) { + if (!range.isSingleKey()) { + hasRange = true; + } + } + slotSpan[offset] = rangeSpan - 1; + offset = offset + rangeSpan; + } + useSkipScan &= dataScanRanges.useSkipScanFilter(); + KeyRange minMaxRange = + clipRange(dataScanRanges.getSchema(), 0, nColumnsInCommon, dataScanRanges.getMinMaxRange()); + slotSpan = slotSpan.length == cnf.size() ? slotSpan : Arrays.copyOf(slotSpan, cnf.size()); + ScanRanges commonScanRanges = ScanRanges.create(dataScanRanges.getSchema(), cnf, slotSpan, minMaxRange, null, useSkipScan, -1); + return commonScanRanges; + } + + /** + * Truncates range to be a max of rangeSpan fields + * @param schema row key schema + * @param fieldIndex starting index of field with in the row key schema + * @param rangeSpan maximum field length + * @return the same range if unchanged and otherwise a new range + */ + public static KeyRange clipRange(RowKeySchema schema, int fieldIndex, int rangeSpan, KeyRange range) { + if (range == KeyRange.EVERYTHING_RANGE) { + return range; + } + if (range == KeyRange.EMPTY_RANGE) { + return range; + } + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + boolean newRange = false; + boolean lowerUnbound = range.lowerUnbound(); + boolean lowerInclusive = range.isLowerInclusive(); + byte[] lowerRange = range.getLowerRange(); + if (!lowerUnbound && lowerRange.length > 0) { + if (clipKeyRangeBytes(schema, fieldIndex, rangeSpan, lowerRange, ptr, true)) { + // Make lower range inclusive since we're decreasing the range by chopping the last part off + lowerInclusive = true; + lowerRange = ptr.copyBytes(); + newRange = true; + } + } + boolean upperUnbound = range.upperUnbound(); + boolean upperInclusive = range.isUpperInclusive(); + byte[] upperRange = range.getUpperRange(); + if (!upperUnbound && upperRange.length > 0) { + if (clipKeyRangeBytes(schema, fieldIndex, rangeSpan, upperRange, ptr, false)) { + // Make lower range inclusive since we're decreasing the range by chopping the last part off + upperInclusive = true; + upperRange = ptr.copyBytes(); + newRange = true; + } + } + + return newRange ? KeyRange.getKeyRange(lowerRange, lowerInclusive, upperRange, upperInclusive) : range; + } + + private static boolean clipKeyRangeBytes(RowKeySchema schema, int fieldIndex, int rangeSpan, byte[] rowKey, ImmutableBytesWritable ptr, boolean trimTrailingNulls) { + int position = 0; + int maxOffset = schema.iterator(rowKey, ptr); + byte[] newRowKey = new byte[rowKey.length]; + int offset = 0; + int trailingNullsToTrim = 0; + do { + if (schema.next(ptr, fieldIndex, maxOffset) == null) { + break; + } + System.arraycopy(ptr.get(), ptr.getOffset(), newRowKey, offset, ptr.getLength()); + offset += ptr.getLength(); + Field field = schema.getField(fieldIndex); + if (field.getDataType().isFixedWidth()) { + trailingNullsToTrim = 0; + } else { + boolean isNull = ptr.getLength() == 0; + byte sepByte = SchemaUtil.getSeparatorByte(true, isNull, field); + newRowKey[offset++] = sepByte; + if (isNull) { + if (trimTrailingNulls) { + trailingNullsToTrim++; + } else { + trailingNullsToTrim = 0; + } + } else { + // So that last zero separator byte is always trimmed + trailingNullsToTrim = 1; + } + } + fieldIndex++; + } while (++position < rangeSpan); + // remove trailing nulls + ptr.set(newRowKey, 0, offset - trailingNullsToTrim); + // return true if we've clipped the rowKey + return maxOffset != offset; + } + /** * Compute the list of parallel scans to run for a given query. The inner scans * may be concatenated together directly, while the other ones may need to be @@ -702,26 +875,43 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // case we generate an empty guide post with the byte estimate being set as guide post // width. boolean emptyGuidePost = gps.isEmptyGuidePost(); + byte[] startRegionBoundaryKey = startKey; + byte[] stopRegionBoundaryKey = stopKey; + int columnsInCommon = 0; + ScanRanges prefixScanRanges = ScanRanges.EVERYTHING; boolean traverseAllRegions = isSalted || isLocalIndex; - if (!traverseAllRegions) { + if (isLocalIndex) { + // TODO: when implementing PHOENIX-4585, we should change this to an assert + // as we should always have a data plan when a local index is being used. + if (dataPlan != null && dataPlan.getTableRef().getTable().getType() != PTableType.INDEX) { // Sanity check + prefixScanRanges = computePrefixScanRanges(dataPlan.getContext().getScanRanges(), columnsInCommon=computeColumnsInCommon()); + KeyRange prefixRange = prefixScanRanges.getScanRange(); + if (!prefixRange.lowerUnbound()) { + startRegionBoundaryKey = prefixRange.getLowerRange(); + } + if (!prefixRange.upperUnbound()) { + stopRegionBoundaryKey = prefixRange.getUpperRange(); + } + } + } else if (!traverseAllRegions) { byte[] scanStartRow = scan.getStartRow(); if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) { - startKey = scanStartRow; + startRegionBoundaryKey = startKey = scanStartRow; } byte[] scanStopRow = scan.getStopRow(); if (stopKey.length == 0 || (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey) < 0)) { - stopKey = scanStopRow; + stopRegionBoundaryKey = stopKey = scanStopRow; } } int regionIndex = 0; int stopIndex = regionBoundaries.size(); - if (startKey.length > 0) { - regionIndex = getIndexContainingInclusive(regionBoundaries, startKey); + if (startRegionBoundaryKey.length > 0) { + regionIndex = getIndexContainingInclusive(regionBoundaries, startRegionBoundaryKey); } - if (stopKey.length > 0) { - stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey)); + if (stopRegionBoundaryKey.length > 0) { + stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopRegionBoundaryKey)); if (isLocalIndex) { stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey(); } @@ -771,15 +961,29 @@ public abstract class BaseResultIterators extends ExplainTable implements Result HRegionLocation regionLocation = regionLocations.get(regionIndex); HRegionInfo regionInfo = regionLocation.getRegionInfo(); byte[] currentGuidePostBytes = currentGuidePost.copyBytes(); - byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY; + byte[] endKey; if (regionIndex == stopIndex) { endKey = stopKey; } else { endKey = regionBoundaries.get(regionIndex); } if (isLocalIndex) { - endRegionKey = regionInfo.getEndKey(); - keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey); + // Only attempt further pruning if the prefix range is using + // a skip scan since we've already pruned the range of regions + // based on the start/stop key. + if (columnsInCommon > 0 && prefixScanRanges.useSkipScanFilter()) { + byte[] regionStartKey = regionInfo.getStartKey(); + ImmutableBytesWritable ptr = context.getTempPtr(); + clipKeyRangeBytes(prefixScanRanges.getSchema(), 0, columnsInCommon, regionStartKey, ptr, false); + regionStartKey = ByteUtil.copyKeyBytesIfNecessary(ptr); + // Prune this region if there's no intersection + if (!prefixScanRanges.intersectRegion(regionStartKey, regionInfo.getEndKey(), false)) { + currentKeyBytes = endKey; + regionIndex++; + continue; + } + } + keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), regionInfo.getEndKey()); } byte[] initialKeyBytes = currentKeyBytes; while (intersectWithGuidePosts && (endKey.length == 0 || currentGuidePost.compareTo(endKey) <= 0)) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java index 06ac3c0..265e213 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java @@ -265,7 +265,8 @@ public abstract class ExplainTable { if (minMaxRange != KeyRange.EVERYTHING_RANGE) { RowKeySchema schema = tableRef.getTable().getRowKeySchema(); if (!minMaxRange.isUnbound(bound)) { - minMaxIterator = new RowKeyValueIterator(schema, minMaxRange.getRange(bound)); + // Use scan ranges from ScanRanges since it will have been intersected with minMaxRange + minMaxIterator = new RowKeyValueIterator(schema, scanRanges.getScanRange().getRange(bound)); } } boolean isLocalIndex = ScanUtil.isLocalIndex(context.getScan()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 3c11f4a..3a4b084 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -57,16 +57,16 @@ public class ParallelIterators extends BaseResultIterators { private final ParallelIteratorFactory iteratorFactory; private final boolean initFirstScanOnly; - public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly, Map<ImmutableBytesPtr,ServerCache> caches) + public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly, Map<ImmutableBytesPtr,ServerCache> caches, QueryPlan dataPlan) throws SQLException { - super(plan, perScanLimit, null, scanGrouper, scan,caches); + super(plan, perScanLimit, null, scanGrouper, scan,caches, dataPlan); this.iteratorFactory = iteratorFactory; this.initFirstScanOnly = initFirstScanOnly; } - public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion, Map<ImmutableBytesPtr,ServerCache> caches) + public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion, Map<ImmutableBytesPtr,ServerCache> caches, QueryPlan dataPlan) throws SQLException { - this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches); + this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches, dataPlan); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index 26d1ed1..f94a7c9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -62,9 +62,9 @@ public class SerialIterators extends BaseResultIterators { private final Integer offset; public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer offset, - ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches) + ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, Map<ImmutableBytesPtr,ServerCache> caches, QueryPlan dataPlan) throws SQLException { - super(plan, perScanLimit, offset, scanGrouper, scan, caches); + super(plan, perScanLimit, offset, scanGrouper, scan, caches, dataPlan); this.offset = offset; // must be a offset or a limit specified or a SERIAL hint Preconditions.checkArgument( http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index b637173..6d203c9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -476,7 +476,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { select = StatementNormalizer.normalize(transformedSelect, resolver); } - QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true).compile(); + QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true, null).compile(); plan.getContext().getSequenceManager().validateSequences(seqAction); return plan; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index 4192869..5cc415d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -234,7 +234,7 @@ public class QueryOptimizer { try { // translate nodes that match expressions that are indexed to the associated column parse node indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(index, null, statement.getConnection(), indexSelect.getUdfParseNodes())); - QueryCompiler compiler = new QueryCompiler(statement, indexSelect, resolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected); + QueryCompiler compiler = new QueryCompiler(statement, indexSelect, resolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, dataPlan); QueryPlan plan = compiler.compile(); // If query doesn't have where clause and some of columns to project are missing @@ -303,7 +303,7 @@ public class QueryOptimizer { query = SubqueryRewriter.transform(query, queryResolver, statement.getConnection()); queryResolver = FromCompiler.getResolverForQuery(query, statement.getConnection()); query = StatementNormalizer.normalize(query, queryResolver); - QueryPlan plan = new QueryCompiler(statement, query, queryResolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected).compile(); + QueryPlan plan = new QueryCompiler(statement, query, queryResolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, dataPlan).compile(); return plan; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 3154f86..d25299a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -81,6 +81,7 @@ import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; import org.apache.phoenix.transaction.TransactionFactory; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.JDBCUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -237,7 +238,12 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple public MetaDataMutationResult createTable(List<Mutation> tableMetaData, byte[] physicalName, PTableType tableType, Map<String, Object> tableProps, List<Pair<byte[], Map<String, Object>>> families, byte[][] splits, boolean isNamespaceMapped, boolean allocateIndexId) throws SQLException { - if (splits != null) { + if (tableType == PTableType.INDEX && IndexUtil.isLocalIndexFamily(Bytes.toString(families.iterator().next().getFirst()))) { + Object dataTableName = tableProps.get(PhoenixDatabaseMetaData.DATA_TABLE_NAME); + List<HRegionLocation> regionLocations = tableSplits.get(dataTableName); + byte[] tableName = getTableName(tableMetaData, physicalName); + tableSplits.put(Bytes.toString(tableName), regionLocations); + } else if (splits != null) { byte[] tableName = getTableName(tableMetaData, physicalName); tableSplits.put(Bytes.toString(tableName), generateRegionLocations(tableName, splits)); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java index 5a672ba..1d61003 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java @@ -4161,4 +4161,230 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { assertEquals(e.getErrorCode(), SQLExceptionCode.CONNECTION_CLOSED.getErrorCode()); } } + + @Test + public void testSingleColLocalIndexPruning() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B CHAR(1) NOT NULL,\n" + + " C CHAR(1) NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,C)"); + String query = "SELECT * FROM T WHERE A = 'B' and C='C'"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List<List<Scan>> outerScans = plan.getScans(); + assertEquals(1, outerScans.size()); + List<Scan> innerScans = outerScans.get(0); + assertEquals(1, innerScans.size()); + Scan scan = innerScans.get(0); + assertEquals("A", Bytes.toString(scan.getStartRow()).trim()); + assertEquals("C", Bytes.toString(scan.getStopRow()).trim()); + } + } + + @Test + public void testMultiColLocalIndexPruning() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B CHAR(1) NOT NULL,\n" + + " C CHAR(1) NOT NULL,\n" + + " D CHAR(1) NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C,\n" + + " D\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,D)"); + String query = "SELECT * FROM T WHERE A = 'C' and B = 'X' and D='C'"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List<List<Scan>> outerScans = plan.getScans(); + assertEquals(1, outerScans.size()); + List<Scan> innerScans = outerScans.get(0); + assertEquals(1, innerScans.size()); + Scan scan = innerScans.get(0); + assertEquals("C", Bytes.toString(scan.getStartRow()).trim()); + assertEquals("E", Bytes.toString(scan.getStopRow()).trim()); + } + } + + @Test + public void testSkipScanLocalIndexPruning() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B CHAR(1) NOT NULL,\n" + + " C CHAR(1) NOT NULL,\n" + + " D CHAR(1) NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C,\n" + + " D\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,D)"); + String query = "SELECT * FROM T WHERE A IN ('A','G') and B = 'A' and D = 'D'"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List<List<Scan>> outerScans = plan.getScans(); + assertEquals(2, outerScans.size()); + List<Scan> innerScans1 = outerScans.get(0); + assertEquals(1, innerScans1.size()); + Scan scan1 = innerScans1.get(0); + assertEquals("A", Bytes.toString(scan1.getStartRow()).trim()); + assertEquals("C", Bytes.toString(scan1.getStopRow()).trim()); + List<Scan> innerScans2 = outerScans.get(1); + assertEquals(1, innerScans2.size()); + Scan scan2 = innerScans2.get(0); + assertEquals("G", Bytes.toString(scan2.getStartRow()).trim()); + assertEquals("I", Bytes.toString(scan2.getStopRow()).trim()); + } + } + + @Test + public void testRVCLocalIndexPruning() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B CHAR(1) NOT NULL,\n" + + " C CHAR(1) NOT NULL,\n" + + " D CHAR(1) NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C,\n" + + " D\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,D)"); + String query = "SELECT * FROM T WHERE A='I' and (B,D) IN (('A','D'),('B','I'))"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List<List<Scan>> outerScans = plan.getScans(); + assertEquals(1, outerScans.size()); + List<Scan> innerScans = outerScans.get(0); + assertEquals(1, innerScans.size()); + Scan scan = innerScans.get(0); + assertEquals("I", Bytes.toString(scan.getStartRow()).trim()); + assertEquals(0, scan.getStopRow().length); + } + } + + @Test + public void testRVCLocalIndexPruning2() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B VARCHAR,\n" + + " C VARCHAR,\n" + + " D VARCHAR,\n" + + " E VARCHAR,\n" + + " F VARCHAR,\n" + + " G VARCHAR,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C,\n" + + " D,\n" + + " E,\n" + + " F,\n" + + " G\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,C,F,G)"); + String query = "SELECT * FROM T WHERE (A,B,C,D) IN (('I','D','F','X'),('I','I','G','Y')) and F='X' and G='Y'"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List<List<Scan>> outerScans = plan.getScans(); + assertEquals(1, outerScans.size()); + List<Scan> innerScans = outerScans.get(0); + assertEquals(1, innerScans.size()); + Scan scan = innerScans.get(0); + assertEquals("I", Bytes.toString(scan.getStartRow()).trim()); + assertEquals(0, scan.getStopRow().length); + } + } + + @Test + public void testMinMaxRangeLocalIndexPruning() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B CHAR(1) NOT NULL,\n" + + " C CHAR(1) NOT NULL,\n" + + " D CHAR(1) NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C,\n" + + " D\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,D)"); + String query = "SELECT * FROM T WHERE A = 'C' and (A,B,D) > ('C','B','X') and D='C'"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List<List<Scan>> outerScans = plan.getScans(); + assertEquals(1, outerScans.size()); + List<Scan> innerScans = outerScans.get(0); + assertEquals(1, innerScans.size()); + Scan scan = innerScans.get(0); + assertEquals("C", Bytes.toString(scan.getStartRow()).trim()); + assertEquals("E", Bytes.toString(scan.getStopRow()).trim()); + } + } + + @Test + public void testNoLocalIndexPruning() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B CHAR(1) NOT NULL,\n" + + " C CHAR(1) NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(C)"); + String query = "SELECT * FROM T WHERE C='C'"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List<List<Scan>> outerScans = plan.getScans(); + assertEquals(6, outerScans.size()); + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java new file mode 100644 index 0000000..abc435a --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.query; + +import static org.apache.phoenix.query.KeyRange.UNBOUND; +import static org.apache.phoenix.query.QueryConstants.DESC_SEPARATOR_BYTE_ARRAY; +import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE_ARRAY; +import static org.junit.Assert.assertEquals; + +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.filter.SkipScanFilter; +import org.apache.phoenix.iterate.BaseResultIterators; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.schema.RowKeySchema; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PSmallint; +import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Lists; + + +/** + * Test for intersect method in {@link SkipScanFilter} + */ +@RunWith(Parameterized.class) +public class KeyRangeClipTest extends BaseConnectionlessQueryTest { + private final RowKeySchema schema; + private final KeyRange input; + private final KeyRange expectedOutput; + private final int clipTo; + + private static byte[] getRange(PhoenixConnection pconn, List<Object> startValues) throws SQLException { + byte[] lowerRange; + if (startValues == null) { + lowerRange = KeyRange.UNBOUND; + } else { + String upsertValues = StringUtils.repeat("?,", startValues.size()).substring(0,startValues.size() * 2 - 1); + String upsertStmt = "UPSERT INTO T VALUES(" + upsertValues + ")"; + PreparedStatement stmt = pconn.prepareStatement(upsertStmt); + for (int i = 0; i < startValues.size(); i++) { + stmt.setObject(i+1, startValues.get(i)); + } + stmt.execute(); + Cell startCell = PhoenixRuntime.getUncommittedDataIterator(pconn).next().getSecond().get(0); + lowerRange = CellUtil.cloneRow(startCell); + pconn.rollback(); + } + return lowerRange; + } + + public KeyRangeClipTest(String tableDef, List<Object> startValues, List<Object> endValues, int clipTo, KeyRange expectedOutput) throws SQLException { + PhoenixConnection pconn = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class); + pconn.createStatement().execute("CREATE TABLE T(" + tableDef+ ")"); + PTable table = pconn.getMetaDataCache().getTableRef(new PTableKey(null,"T")).getTable(); + this.schema = table.getRowKeySchema(); + byte[] lowerRange = getRange(pconn, startValues); + byte[] upperRange = getRange(pconn, endValues); + this.input = KeyRange.getKeyRange(lowerRange, upperRange); + this.expectedOutput = expectedOutput; + this.clipTo = clipTo; + } + + @After + public void cleanup() throws SQLException { + PhoenixConnection pconn = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class); + pconn.createStatement().execute("DROP TABLE T"); + } + + @Test + public void test() { + ScanRanges scanRanges = ScanRanges.create(schema, Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(input)), new int[] {schema.getFieldCount()-1}, KeyRange.EVERYTHING_RANGE, null, false, -1); + ScanRanges clippedRange = BaseResultIterators.computePrefixScanRanges(scanRanges, clipTo); + assertEquals(expectedOutput, clippedRange.getScanRange()); + } + + @Parameters(name="KeyRangeClipTest_{0}") + public static Collection<Object> data() { + List<Object> testCases = Lists.newArrayList(); + testCases.add(Lists.newArrayList( // [XY - *] + "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C)", + Lists.newArrayList("XY",null,"Z"), null, 2, + KeyRange.getKeyRange(Bytes.toBytes("XY"), true, UNBOUND, false)).toArray()); + testCases.add(Lists.newArrayList( + "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C)", + null, Lists.newArrayList("XY",null,"Z"), 2, + KeyRange.getKeyRange( + ByteUtil.nextKey(SEPARATOR_BYTE_ARRAY), true, // skips null values for unbound lower + ByteUtil.nextKey(ByteUtil.concat(Bytes.toBytes("XY"),SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY)), false)).toArray()); + testCases.add(Lists.newArrayList( + "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, D VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C,D)", + Lists.newArrayList("XY",null,null,"Z"), null, 3, + KeyRange.getKeyRange(Bytes.toBytes("XY"), true, UNBOUND, false)).toArray()); + testCases.add(Lists.newArrayList( + "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, D VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C,D)", + null, Lists.newArrayList("XY",null,null,"Z"), 3, + KeyRange.getKeyRange( + ByteUtil.nextKey(SEPARATOR_BYTE_ARRAY), true, // skips null values for unbound lower + ByteUtil.nextKey(ByteUtil.concat(Bytes.toBytes("XY"),SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY)), false)).toArray()); + testCases.add(Lists.newArrayList( + "A CHAR(1) NOT NULL, B CHAR(1) NOT NULL, C CHAR(1) NOT NULL, CONSTRAINT PK PRIMARY KEY (A,B,C)", + Lists.newArrayList("A","B","C"), Lists.newArrayList("C","D","E"), 2, + KeyRange.getKeyRange(Bytes.toBytes("AB"), true, ByteUtil.nextKey(Bytes.toBytes("CD")), false)).toArray()); + testCases.add(Lists.newArrayList( + "A VARCHAR NOT NULL, B VARCHAR, C SMALLINT NOT NULL, D VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C,D)", + Lists.<Object>newArrayList("XY",null,1,"Z"), null, 3, + KeyRange.getKeyRange(ByteUtil.concat(Bytes.toBytes("XY"), SEPARATOR_BYTE_ARRAY, SEPARATOR_BYTE_ARRAY, PSmallint.INSTANCE.toBytes(1)), true, UNBOUND, false)).toArray()); + testCases.add(Lists.newArrayList( + "A VARCHAR NOT NULL, B BIGINT NOT NULL, C VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B DESC,C)", + Lists.<Object>newArrayList("XYZ",1,"Z"), null, 2, + KeyRange.getKeyRange(ByteUtil.concat(Bytes.toBytes("XYZ"), SEPARATOR_BYTE_ARRAY, PLong.INSTANCE.toBytes(1, SortOrder.DESC)), true, UNBOUND, false)).toArray()); + testCases.add(Lists.newArrayList( + "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, CONSTRAINT PK PRIMARY KEY (A DESC,B,C)", + null, Lists.newArrayList("XY",null,"Z"), 3, + KeyRange.getKeyRange( + ByteUtil.nextKey(SEPARATOR_BYTE_ARRAY), true, // skips null values for unbound lower + (ByteUtil.concat(PVarchar.INSTANCE.toBytes("XY",SortOrder.DESC),DESC_SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,Bytes.toBytes("Z"))), false)).toArray()); + return testCases; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b693e93/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java index 0f12d9c..1903dda 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java @@ -493,7 +493,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { return Cost.ZERO; } - }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null); + }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null, null); List<KeyRange> keyRanges = parallelIterators.getSplits(); return keyRanges; }