Repository: phoenix Updated Branches: refs/heads/master de9a2c7b0 -> a31b70179
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 3237882..77f4411 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -39,7 +39,6 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -64,6 +63,7 @@ import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.Closeables; @@ -402,8 +402,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } Region region = c.getEnvironment().getRegion(); - region.startRegionOperation(); + boolean aquiredLock = false; try { + region.startRegionOperation(); + aquiredLock = true; synchronized (scanner) { do { List<Cell> results = new ArrayList<Cell>(); @@ -423,8 +425,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } } while (hasMore && groupByCache.size() < limit); } - } finally { - region.closeRegionOperation(); + } finally { + if (aquiredLock) region.closeRegionOperation(); } RegionScanner regionScanner = groupByCache.getScanner(scanner); @@ -472,8 +474,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { // start of a new row. Otherwise, we have to wait until an agg int countOffset = rowAggregators.length == 0 ? 1 : 0; Region region = c.getEnvironment().getRegion(); - region.startRegionOperation(); + boolean aquiredLock = false; try { + region.startRegionOperation(); + aquiredLock = true; synchronized (scanner) { do { List<Cell> kvs = new ArrayList<Cell>(); @@ -505,7 +509,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } while (hasMore && !aggBoundary && !atLimit); } } finally { - region.closeRegionOperation(); + if (aquiredLock) region.closeRegionOperation(); } if (currentKey != null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 77b8b3e..7e052f5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -86,6 +86,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; import org.apache.phoenix.schema.stats.StatisticsCollector; @@ -288,8 +289,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } long rowCount = 0; final RegionScanner innerScanner = theScanner; - region.startRegionOperation(); + boolean aquiredLock = false; try { + region.startRegionOperation(); + aquiredLock = true; synchronized (innerScanner) { do { List<Cell> results = new ArrayList<Cell>(); @@ -529,7 +532,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver try { innerScanner.close(); } finally { - region.closeRegionOperation(); + if (aquiredLock) region.closeRegionOperation(); } } if (logger.isDebugEnabled()) { @@ -608,7 +611,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver InternalScanner internalScanner = scanner; if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) { try { - Pair<HRegionInfo, HRegionInfo> mergeRegions = null; long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime(); StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector( c.getEnvironment(), table.getNameAsString(), clientTimeStamp, http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index b125ecc..e70967c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; @@ -121,7 +122,7 @@ public class AggregatePlan extends BaseQueryPlan { this.services = services; } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException { + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws SQLException { Expression expression = RowKeyExpression.INSTANCE; OrderByExpression orderByExpression = new OrderByExpression(expression, false, true); int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); @@ -138,9 +139,9 @@ public class AggregatePlan extends BaseQueryPlan { this.outerFactory = outerFactory; } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException { - PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan, tableName); - return outerFactory.newIterator(context, iterator, scan, tableName); + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws SQLException { + PeekingResultIterator iterator = innerFactory.newIterator(context, scanner, scan, tableName, plan); + return outerFactory.newIterator(context, iterator, scan, tableName, plan); } } @@ -164,12 +165,12 @@ public class AggregatePlan extends BaseQueryPlan { } @Override - protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException { + protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { if (groupBy.isEmpty()) { - UngroupedAggregateRegionObserver.serializeIntoScan(context.getScan()); + UngroupedAggregateRegionObserver.serializeIntoScan(scan); } else { // Set attribute with serialized expressions for coprocessor - GroupedAggregateRegionObserver.serializeIntoScan(context.getScan(), groupBy.getScanAttribName(), groupBy.getKeyExpressions()); + GroupedAggregateRegionObserver.serializeIntoScan(scan, groupBy.getScanAttribName(), groupBy.getKeyExpressions()); if (limit != null && orderBy.getOrderByExpressions().isEmpty() && having == null && ( ( statement.isDistinct() && ! statement.isAggregate() ) || ( ! statement.isDistinct() && ( context.getAggregationManager().isEmpty() @@ -200,8 +201,8 @@ public class AggregatePlan extends BaseQueryPlan { } } BaseResultIterators iterators = statement.getHint().hasHint(HintNode.Hint.SERIAL) - ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper) - : new ParallelIterators(this, null, wrapParallelIteratorFactory()); + ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan) + : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan); splits = iterators.getSplits(); scans = iterators.getScans(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index cedd23e..83e55ee 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -203,26 +203,30 @@ public abstract class BaseQueryPlan implements QueryPlan { @Override public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { - return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper); + return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, this.context.getScan()); } - + + @Override + public final ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, scan); + } + @Override public final ResultIterator iterator() throws SQLException { - return iterator(Collections.<SQLCloseable>emptyList(), DefaultParallelScanGrouper.getInstance()); + return iterator(Collections.<SQLCloseable>emptyList(), DefaultParallelScanGrouper.getInstance(), this.context.getScan()); } - public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies, ParallelScanGrouper scanGrouper) throws SQLException { + public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { if (context.getScanRanges() == ScanRanges.NOTHING) { return ResultIterator.EMPTY_ITERATOR; } if (tableRef == TableRef.EMPTY_TABLE_REF) { - return newIterator(scanGrouper); + return newIterator(scanGrouper, scan); } // Set miscellaneous scan attributes. This is the last chance to set them before we // clone the scan for each parallelized chunk. - Scan scan = context.getScan(); TableRef tableRef = context.getCurrentTable(); PTable table = tableRef.getTable(); @@ -319,7 +323,7 @@ public abstract class BaseQueryPlan implements QueryPlan { LOG.debug(LogUtil.addCustomAnnotations("Scan ready for iteration: " + scan, connection)); } - ResultIterator iterator = newIterator(scanGrouper); + ResultIterator iterator = newIterator(scanGrouper, scan); iterator = dependencies.isEmpty() ? iterator : new DelegateResultIterator(iterator) { @Override @@ -448,7 +452,7 @@ public abstract class BaseQueryPlan implements QueryPlan { } } - abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException; + abstract protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException; @Override public long getEstimatedSize() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java index f4e374e..7871f9e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; @@ -83,7 +84,12 @@ public class ClientAggregatePlan extends ClientProcessingPlan { @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { - ResultIterator iterator = delegate.iterator(scanGrouper); + return iterator(scanGrouper, delegate.getContext().getScan()); + } + + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + ResultIterator iterator = delegate.iterator(scanGrouper, scan); if (where != null) { iterator = new FilterResultIterator(iterator, where); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java index 003c995..4e43225 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java @@ -20,6 +20,7 @@ package org.apache.phoenix.execute; import java.sql.SQLException; import java.util.List; +import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.QueryPlan; @@ -50,7 +51,11 @@ public class ClientScanPlan extends ClientProcessingPlan { @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { - ResultIterator iterator = delegate.iterator(scanGrouper); + return iterator(scanGrouper, delegate.getContext().getScan()); + } + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + ResultIterator iterator = delegate.iterator(scanGrouper, scan); if (where != null) { iterator = new FilterResultIterator(iterator, where); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java index 1b0af8c..fc5a04d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.sql.SQLException; import java.util.List; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.QueryPlan; @@ -103,7 +104,12 @@ public class CorrelatePlan extends DelegateQueryPlan { } @Override - public ResultIterator iterator(ParallelScanGrouper scanGrouper) + public ResultIterator iterator(ParallelScanGrouper scanGrouper) + throws SQLException { + return iterator(scanGrouper, null); + } + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { return new ResultIterator() { private final ValueBitSet destBitSet = ValueBitSet.newInstance(joinedSchema); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java index 36b725e..5887ff3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java @@ -52,7 +52,7 @@ public class DegenerateQueryPlan extends BaseQueryPlan { } @Override - protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException { + protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 5fdec46..e3d9721 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -140,6 +140,11 @@ public class HashJoinPlan extends DelegateQueryPlan { @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { + return iterator(scanGrouper, this.delegate.getContext().getScan()); + } + + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { int count = subPlans.length; PhoenixConnection connection = getContext().getConnection(); ConnectionQueryServices services = connection.getQueryServices(); @@ -216,11 +221,10 @@ public class HashJoinPlan extends DelegateQueryPlan { } if (joinInfo != null) { - Scan scan = delegate.getContext().getScan(); HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo); } - ResultIterator iterator = joinInfo == null ? delegate.iterator(scanGrouper) : ((BaseQueryPlan) delegate).iterator(dependencies, scanGrouper); + ResultIterator iterator = joinInfo == null ? delegate.iterator(scanGrouper) : ((BaseQueryPlan) delegate).iterator(dependencies, scanGrouper, scan); if (statement.getInnerSelectStatement() != null && postFilter != null) { iterator = new FilterResultIterator(iterator, postFilter); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java index fe767d9..db99964 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java @@ -71,7 +71,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan { } @Override - protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) + protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { ResultIterator scanner = new ResultIterator() { private final Iterator<Tuple> tupleIterator = tuples.iterator(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 93ae5d6..1f18ddd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -188,9 +188,8 @@ public class ScanPlan extends BaseQueryPlan { } @Override - protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException { + protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { // Set any scan attributes before creating the scanner, as it will be too late afterwards - Scan scan = context.getScan(); scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, QueryConstants.TRUE); ResultIterator scanner; TableRef tableRef = this.getTableRef(); @@ -208,11 +207,11 @@ public class ScanPlan extends BaseQueryPlan { BaseResultIterators iterators; boolean isOffsetOnServer = isOffsetPossibleOnServer(context, orderBy, offset, isSalted, table.getIndexType()); if (isOffsetOnServer) { - iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper); + iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan); } else if (isSerial) { - iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper); + iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan); } else { - iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper); + iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan); } splits = iterators.getSplits(); scans = iterators.getScans(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index e181e80..8e0e6e2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -131,7 +131,12 @@ public class SortMergeJoinPlan implements QueryPlan { } @Override - public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { + public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { + return iterator(scanGrouper, null); + } + + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { return type == JoinType.Semi || type == JoinType.Anti ? new SemiAntiJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper)) : new BasicJoinIterator(lhsPlan.iterator(scanGrouper), rhsPlan.iterator(scanGrouper)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java index e8d9af0..0ba0cc1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java @@ -20,6 +20,7 @@ package org.apache.phoenix.execute; import java.sql.SQLException; import java.util.List; +import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.expression.Expression; @@ -60,7 +61,12 @@ public class TupleProjectionPlan extends DelegateQueryPlan { @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { - ResultIterator iterator = new DelegateResultIterator(delegate.iterator(scanGrouper)) { + return iterator(scanGrouper, delegate.getContext().getScan()); + } + + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + ResultIterator iterator = new DelegateResultIterator(delegate.iterator(scanGrouper, scan)) { @Override public Tuple next() throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java index 808141e..cf95b5b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java @@ -139,7 +139,11 @@ public class UnionPlan implements QueryPlan { public final ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { return iterator(Collections.<SQLCloseable>emptyList()); } - + + @Override + public final ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + return iterator(Collections.<SQLCloseable>emptyList()); + } @Override public final ResultIterator iterator() throws SQLException { return iterator(Collections.<SQLCloseable>emptyList()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java index 8905eef..94c59df 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java @@ -20,6 +20,7 @@ package org.apache.phoenix.execute; import java.sql.SQLException; import java.util.List; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.QueryPlan; @@ -53,7 +54,12 @@ public class UnnestArrayPlan extends DelegateQueryPlan { @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { - return new UnnestArrayResultIterator(delegate.iterator(scanGrouper)); + return new UnnestArrayResultIterator(delegate.iterator(scanGrouper, delegate.getContext().getScan())); + } + + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + return new UnnestArrayResultIterator(delegate.iterator(scanGrouper, scan)); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 0299f18..0cf2faa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -17,8 +17,10 @@ */ package org.apache.phoenix.iterate; -import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX; + import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER; import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; @@ -116,7 +118,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private final List<KeyRange> splits; private final PTableStats tableStats; private final byte[] physicalTableName; - private final QueryPlan plan; + protected final QueryPlan plan; protected final String scanId; protected final MutationState mutationState; private final ParallelScanGrouper scanGrouper; @@ -125,6 +127,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private Long estimatedRows; private Long estimatedSize; private boolean hasGuidePosts; + private Scan scan; static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new Function<HRegionLocation, KeyRange>() { @Override @@ -138,7 +141,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } private boolean useStats() { - Scan scan = context.getScan(); boolean isPointLookup = context.getScanRanges().isPointLookup(); /* * Don't use guide posts if: @@ -153,11 +155,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return true; } - private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset) { + private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) { StatementContext context = plan.getContext(); TableRef tableRef = plan.getTableRef(); PTable table = tableRef.getTable(); - Scan scan = context.getScan(); Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap(); // Hack for PHOENIX-2067 to force raw scan over all KeyValues to fix their row keys @@ -331,10 +332,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } - public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper) throws SQLException { + public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint(), plan.getLimit(), plan instanceof ScanPlan ? plan.getOffset() : null); this.plan = plan; + this.scan = scan; this.scanGrouper = scanGrouper; StatementContext context = plan.getContext(); // Clone MutationState as the one on the connection will change if auto commit is on @@ -347,7 +349,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // Used to tie all the scans together during logging scanId = UUID.randomUUID().toString(); - initializeScan(plan, perScanLimit, offset); + initializeScan(plan, perScanLimit, offset, scan); this.scans = getParallelScans(); List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION); @@ -471,10 +473,69 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } private List<List<Scan>> getParallelScans() throws SQLException { + // If the scan boundaries are not matching with scan in context that means we need to get + // parallel scans for the chunk after split/merge. + if (!ScanUtil.isConextScan(scan, context)) { + return getParallelScans(scan); + } return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY); } /** + * Get parallel scans of the specified scan boundaries. This can be used for getting parallel + * scans when there is split/merges while scanning a chunk. In this case we need not go by all + * the regions or guideposts. + * @param scan + * @return + * @throws SQLException + */ + private List<List<Scan>> getParallelScans(Scan scan) throws SQLException { + List<HRegionLocation> regionLocations = context.getConnection().getQueryServices() + .getAllTableRegions(physicalTableName); + List<byte[]> regionBoundaries = toBoundaries(regionLocations); + int regionIndex = 0; + int stopIndex = regionBoundaries.size(); + if (scan.getStartRow().length > 0) { + regionIndex = getIndexContainingInclusive(regionBoundaries, scan.getStartRow()); + } + if (scan.getStopRow().length > 0) { + stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), scan.getStopRow())); + } + List<List<Scan>> parallelScans = Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1); + List<Scan> scans = Lists.newArrayListWithExpectedSize(2); + while (regionIndex <= stopIndex) { + HRegionLocation regionLocation = regionLocations.get(regionIndex); + HRegionInfo regionInfo = regionLocation.getRegionInfo(); + Scan newScan = ScanUtil.newScan(scan); + byte[] endKey; + if (regionIndex == stopIndex) { + endKey = scan.getStopRow(); + } else { + endKey = regionBoundaries.get(regionIndex); + } + if(ScanUtil.isLocalIndex(scan)) { + ScanUtil.setLocalIndexAttributes(newScan, 0, regionInfo.getStartKey(), + regionInfo.getEndKey(), newScan.getAttribute(SCAN_START_ROW_SUFFIX), + newScan.getAttribute(SCAN_STOP_ROW_SUFFIX)); + } else { + if(Bytes.compareTo(scan.getStartRow(), regionInfo.getStartKey())<=0) { + newScan.setAttribute(SCAN_ACTUAL_START_ROW, regionInfo.getStartKey()); + newScan.setStartRow(regionInfo.getStartKey()); + } + if(scan.getStopRow().length == 0 || (regionInfo.getEndKey().length != 0 && Bytes.compareTo(scan.getStopRow(), regionInfo.getEndKey())>0)) { + newScan.setStopRow(regionInfo.getEndKey()); + } + } + scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation); + regionIndex++; + } + if (!scans.isEmpty()) { // Add any remaining scans + parallelScans.add(scans); + } + return parallelScans; + } + + /** * Compute the list of parallel scans to run for a given query. The inner scans * may be concatenated together directly, while the other ones may need to be * merge sorted, depending on the query. @@ -482,7 +543,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result * @throws SQLException */ private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException { - Scan scan = context.getScan(); List<HRegionLocation> regionLocations = context.getConnection().getQueryServices() .getAllTableRegions(physicalTableName); List<byte[]> regionBoundaries = toBoundaries(regionLocations); @@ -555,6 +615,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // Merge bisect with guideposts for all but the last region while (regionIndex <= stopIndex) { + HRegionLocation regionLocation = regionLocations.get(regionIndex); + HRegionInfo regionInfo = regionLocation.getRegionInfo(); byte[] currentGuidePostBytes = currentGuidePost.copyBytes(); byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY; if (regionIndex == stopIndex) { @@ -562,9 +624,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } else { endKey = regionBoundaries.get(regionIndex); } - HRegionLocation regionLocation = regionLocations.get(regionIndex); if (isLocalIndex) { - HRegionInfo regionInfo = regionLocation.getRegionInfo(); endRegionKey = regionInfo.getEndKey(); keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey); } @@ -572,6 +632,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) { Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset, false); + if(newScan != null) { + ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(), + regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow()); + } if (newScan != null) { estimatedRows += gps.getRowCounts().get(guideIndex); estimatedSize += gps.getByteCounts().get(guideIndex); @@ -584,12 +648,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } catch (EOFException e) {} Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, endKey, keyOffset, true); - if (isLocalIndex) { - if (newScan != null) { - newScan.setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey); - } else if (!scans.isEmpty()) { - scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey); - } + if(newScan != null) { + ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(), + regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow()); } scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation); currentKeyBytes = endKey; @@ -628,7 +689,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result */ @Override public List<PeekingResultIterator> getIterators() throws SQLException { - Scan scan = context.getScan(); if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Getting iterators for " + this, ScanUtil.getCustomAnnotations(scan))); @@ -676,7 +736,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result SQLException toThrow = null; int queryTimeOut = context.getStatement().getQueryTimeoutInMillis(); try { - submitWork(scan, futures, allIterators, splitSize); + submitWork(scan, futures, allIterators, splitSize, this.scanGrouper); boolean clearedCache = false; for (List<Pair<Scan,Future<PeekingResultIterator>>> future : reverseIfNecessary(futures,isReverse)) { List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(future.size()); @@ -691,11 +751,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result previousScan.getScan().getStopRow()) < 0) || (isReverse && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW), previousScan.getScan().getStopRow()) > 0) - || (scanPair.getFirst().getAttribute(EXPECTED_UPPER_REGION_KEY) != null - && previousScan.getScan().getAttribute(EXPECTED_UPPER_REGION_KEY) != null - && Bytes.compareTo(scanPair.getFirst().getAttribute(EXPECTED_UPPER_REGION_KEY), - previousScan.getScan() - .getAttribute(EXPECTED_UPPER_REGION_KEY)) == 0))) { + || (Bytes.compareTo(scanPair.getFirst().getStopRow(), + previousScan.getScan().getStopRow()) == 0))) { continue; } PeekingResultIterator iterator = scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS); @@ -714,9 +771,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result Scan oldScan = scanPair.getFirst(); byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW); byte[] endKey = oldScan.getStopRow(); - if (isLocalIndex) { - endKey = oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY); - } List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey); // Add any concatIterators that were successful so far @@ -868,7 +922,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result abstract protected String getName(); abstract protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - Queue<PeekingResultIterator> allIterators, int estFlattenedSize) throws SQLException; + Queue<PeekingResultIterator> allIterators, int estFlattenedSize, ParallelScanGrouper scanGrouper) throws SQLException; @Override public int size() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index a78565d..c4e52f4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -18,7 +18,7 @@ package org.apache.phoenix.iterate; -import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; import java.sql.SQLException; @@ -27,6 +27,7 @@ import java.util.List; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.query.QueryServices; @@ -56,6 +57,7 @@ public class ChunkedResultIterator implements PeekingResultIterator { private final MutationState mutationState; private Scan scan; private PeekingResultIterator resultIterator; + private QueryPlan plan; public static class ChunkedResultIteratorFactory implements ParallelIteratorFactory { @@ -73,30 +75,31 @@ public class ChunkedResultIterator implements PeekingResultIterator { } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException { + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws SQLException { if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("ChunkedResultIteratorFactory.newIterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan))); return new ChunkedResultIterator(delegateFactory, mutationState, context, tableRef, scan, mutationState.getConnection().getQueryServices().getProps().getLong( QueryServices.SCAN_RESULT_CHUNK_SIZE, - QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner); + QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner, plan); } } private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory, MutationState mutationState, - StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner) throws SQLException { + StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner, QueryPlan plan) throws SQLException { this.delegateIteratorFactory = delegateIteratorFactory; this.context = context; this.tableRef = tableRef; this.scan = scan; this.chunkSize = chunkSize; this.mutationState = mutationState; + this.plan = plan; // Instantiate single chunk iterator and the delegate iterator in constructor // to get parallel scans kicked off in separate threads. If we delay this, // we'll get serialized behavior (see PHOENIX- if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get first chunked result iterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan))); ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(scanner, chunkSize); String tableName = tableRef.getTable().getPhysicalName().getString(); - resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName); + resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan); } @Override @@ -123,13 +126,20 @@ public class ChunkedResultIterator implements PeekingResultIterator { if (resultIterator.peek() == null && lastKey != null) { resultIterator.close(); scan = ScanUtil.newScan(scan); - scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey)); + if(ScanUtil.isLocalIndex(scan)) { + scan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.copyKeyBytesIfNecessary(lastKey)); + } else { + scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey)); + } if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan))); String tableName = tableRef.getTable().getPhysicalName().getString(); long renewLeaseThreshold = context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds(); - ResultIterator singleChunkResultIterator = new SingleChunkResultIterator( - new TableResultIterator(mutationState, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold), chunkSize); - resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName); + ResultIterator singleChunkResultIterator = + new SingleChunkResultIterator(new TableResultIterator(mutationState, scan, + context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), + renewLeaseThreshold, plan, DefaultParallelScanGrouper.getInstance()), + chunkSize); + resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName, plan); } return resultIterator; } @@ -163,9 +173,6 @@ public class ChunkedResultIterator implements PeekingResultIterator { // be able to start the next chunk on the next row key if (rowCount == chunkSize) { next.getKey(lastKey); - if (scan.getAttribute(STARTKEY_OFFSET) != null) { - addRegionStartKeyToLaskKey(); - } } else if (rowCount > chunkSize && rowKeyChanged(next)) { chunkComplete = true; return null; @@ -192,29 +199,10 @@ public class ChunkedResultIterator implements PeekingResultIterator { int offset = lastKey.getOffset(); int length = lastKey.getLength(); newTuple.getKey(lastKey); - if (scan.getAttribute(STARTKEY_OFFSET) != null) { - addRegionStartKeyToLaskKey(); - } return Bytes.compareTo(currentKey, offset, length, lastKey.get(), lastKey.getOffset(), lastKey.getLength()) != 0; } - /** - * Prefix region start key to last key to form actual row key in case of local index scan. - */ - private void addRegionStartKeyToLaskKey() { - byte[] offsetBytes = scan.getAttribute(STARTKEY_OFFSET); - if (offsetBytes != null) { - int startKeyOffset = Bytes.toInt(offsetBytes); - byte[] actualLastkey = - new byte[startKeyOffset + lastKey.getLength() - lastKey.getOffset()]; - System.arraycopy(scan.getStartRow(), 0, actualLastkey, 0, startKeyOffset); - System.arraycopy(lastKey.get(), lastKey.getOffset(), actualLastkey, - startKeyOffset, lastKey.getLength()); - lastKey.set(actualLastkey); - } - } - @Override public String toString() { return "SingleChunkResultIterator [rowCount=" + rowCount http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java index 2258caf..b720b56 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java @@ -20,6 +20,7 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.monitoring.CombinableMetric; import org.apache.phoenix.schema.TableRef; @@ -28,8 +29,8 @@ public class DefaultTableResultIteratorFactory implements TableResultIteratorFac @Override public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, - CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException { - return new TableResultIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold); + CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { + return new TableResultIterator(mutationState, scan, scanMetrics, renewLeaseThreshold, plan, scanGrouper); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java index f25e373..dbe9910 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java @@ -20,15 +20,16 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.StatementContext; public interface ParallelIteratorFactory { public static ParallelIteratorFactory NOOP_FACTORY = new ParallelIteratorFactory() { @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName, QueryPlan plan) throws SQLException { return LookAheadResultIterator.wrap(scanner); } }; - PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException; + PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName, QueryPlan plan) throws SQLException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index ca0eba0..e40953e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -54,20 +54,20 @@ public class ParallelIterators extends BaseResultIterators { private static final String NAME = "PARALLEL"; private final ParallelIteratorFactory iteratorFactory; - public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper) + public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { - super(plan, perScanLimit, null, scanGrouper); + super(plan, perScanLimit, null, scanGrouper, scan); this.iteratorFactory = iteratorFactory; } - public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory) + public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan) throws SQLException { - this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance()); + this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan); } @Override protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - final Queue<PeekingResultIterator> allIterators, int estFlattenedSize) throws SQLException { + final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, ParallelScanGrouper scanGrouper) throws SQLException { // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor @@ -98,7 +98,7 @@ public class ParallelIterators extends BaseResultIterators { final Scan scan = scanLocation.getScan(); final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName); final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName); - final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold); + final TableResultIterator tableResultItr = context.getConnection().getTableResultIteratorFactory().newIterator(mutationState, tableRef, scan, scanMetrics, renewLeaseThreshold, this.plan, scanGrouper); context.getConnection().addIterator(tableResultItr); Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { @@ -109,7 +109,7 @@ public class ParallelIterators extends BaseResultIterators { if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan))); } - PeekingResultIterator iterator = iteratorFactory.newIterator(context, tableResultItr, scan, physicalTableName); + PeekingResultIterator iterator = iteratorFactory.newIterator(context, tableResultItr, scan, physicalTableName, ParallelIterators.this.plan); // Fill the scanner's cache. This helps reduce latency since we are parallelizing the I/O needed. iterator.peek(); allIterators.add(iterator); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index 17c2279..1f89ec1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -55,9 +55,9 @@ public class SerialIterators extends BaseResultIterators { private final ParallelIteratorFactory iteratorFactory; public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer offset, - ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper) + ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { - super(plan, perScanLimit, offset, scanGrouper); + super(plan, perScanLimit, offset, scanGrouper, scan); // must be a offset or a limit specified or a SERIAL hint Preconditions.checkArgument( offset != null || perScanLimit != null || plan.getStatement().getHint().hasHint(HintNode.Hint.SERIAL)); @@ -66,7 +66,7 @@ public class SerialIterators extends BaseResultIterators { @Override protected void submitWork(List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, - final Queue<PeekingResultIterator> allIterators, int estFlattenedSize) { + final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, final ParallelScanGrouper scanGrouper) { // Pre-populate nestedFutures lists so that we can shuffle the scans // and add the future to the right nested list. By shuffling the scans // we get better utilization of the cluster since our thread executor @@ -90,9 +90,9 @@ public class SerialIterators extends BaseResultIterators { PeekingResultIterator previousIterator = null; List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(scans.size()); for (final Scan scan : scans) { - TableResultIterator scanner = new TableResultIterator(mutationState, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold, previousIterator); + TableResultIterator scanner = new TableResultIterator(mutationState, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), renewLeaseThreshold, previousIterator, plan, scanGrouper); conn.addIterator(scanner); - PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan, tableName); + PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan, tableName, plan); concatIterators.add(iterator); previousIterator = iterator; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java index 0a3c32b..64e33de 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java @@ -36,6 +36,7 @@ import org.apache.commons.io.output.DeferredFileOutputStream; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.memory.MemoryManager; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; @@ -71,7 +72,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { this.services = services; } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException { + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName, QueryPlan plan) throws SQLException { ReadMetricQueue readRequestMetric = context.getReadMetricsQueue(); SpoolingMetricsHolder spoolMetrics = new SpoolingMetricsHolder(readRequestMetric, physicalTableName); MemoryMetricsHolder memoryMetrics = new MemoryMetricsHolder(readRequestMetric, physicalTableName); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index a86f899..a1ef7da 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.iterate; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; + import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED; import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED; @@ -32,14 +35,17 @@ import javax.annotation.concurrent.GuardedBy; import org.apache.hadoop.hbase.client.AbstractClientScanner; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.monitoring.CombinableMetric; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import com.google.common.annotations.VisibleForTesting; @@ -59,6 +65,10 @@ public class TableResultIterator implements ResultIterator { private final CombinableMetric scanMetrics; private static final ResultIterator UNINITIALIZED_SCANNER = ResultIterator.EMPTY_ITERATOR; private final long renewLeaseThreshold; + private final QueryPlan plan; + private final ParallelScanGrouper scanGrouper; + private Tuple lastTuple = null; + private ImmutableBytesWritable ptr = new ImmutableBytesWritable(); @GuardedBy("this") private ResultIterator scanIterator; @@ -76,26 +86,29 @@ public class TableResultIterator implements ResultIterator { this.renewLeaseThreshold = 0; this.htable = null; this.scan = null; + this.plan = null; + this.scanGrouper = null; } public static enum RenewLeaseStatus { RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, NOT_RENEWED }; - public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, - long renewLeaseThreshold) throws SQLException { - this(mutationState,tableRef,scan,scanMetrics,renewLeaseThreshold,null); + public TableResultIterator(MutationState mutationState, Scan scan, CombinableMetric scanMetrics, + long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { + this(mutationState,scan,scanMetrics,renewLeaseThreshold,null, plan, scanGrouper); } - public TableResultIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, - long renewLeaseThreshold, PeekingResultIterator previousIterator) throws SQLException { + public TableResultIterator(MutationState mutationState, Scan scan, CombinableMetric scanMetrics, + long renewLeaseThreshold, PeekingResultIterator previousIterator, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException { this.scan = scan; this.scanMetrics = scanMetrics; - PTable table = tableRef.getTable(); - htable = mutationState.getHTable(table); + this.plan = plan; + htable = mutationState.getHTable(plan.getTableRef().getTable()); this.scanIterator = UNINITIALIZED_SCANNER; this.renewLeaseThreshold = renewLeaseThreshold; this.previousIterator = previousIterator; + this.scanGrouper = scanGrouper; } @Override @@ -116,8 +129,42 @@ public class TableResultIterator implements ResultIterator { @Override public synchronized Tuple next() throws SQLException { initScanner(); - Tuple t = scanIterator.next(); - return t; + try { + lastTuple = scanIterator.next(); + if (lastTuple != null) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + lastTuple.getKey(ptr); + } + } catch (SQLException e) { + try { + throw ServerUtil.parseServerException(e); + } catch(StaleRegionBoundaryCacheException e1) { + if(ScanUtil.isNonAggregateScan(scan)) { + // For non aggregate queries if we get stale region boundary exception we can + // continue scanning from the next value of lasted fetched result. + Scan newScan = ScanUtil.newScan(scan); + newScan.setStartRow(newScan.getAttribute(SCAN_ACTUAL_START_ROW)); + if(lastTuple != null) { + lastTuple.getKey(ptr); + byte[] startRowSuffix = ByteUtil.copyKeyBytesIfNecessary(ptr); + if(ScanUtil.isLocalIndex(newScan)) { + // If we just set scan start row suffix then server side we prepare + // actual scan boundaries by prefixing the region start key. + newScan.setAttribute(SCAN_START_ROW_SUFFIX, ByteUtil.nextKey(startRowSuffix)); + } else { + newScan.setStartRow(ByteUtil.nextKey(startRowSuffix)); + } + } + plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName()); + this.scanIterator = + plan.iterator(scanGrouper, newScan); + lastTuple = scanIterator.next(); + } else { + throw e; + } + } + } + return lastTuple; } public synchronized void initScanner() throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java index 0f5ac9b..8d7b54d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java @@ -20,10 +20,11 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.monitoring.CombinableMetric; import org.apache.phoenix.schema.TableRef; public interface TableResultIteratorFactory { - public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold) throws SQLException; + public TableResultIterator newIterator(MutationState mutationState, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 5b799a0..0c154e2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -491,6 +491,11 @@ public class PhoenixStatement implements Statement, SQLCloseable { } @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + return iterator; + } + + @Override public long getEstimatedSize() { return 0; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java index 51ac795..17d9b6a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@ -39,6 +39,7 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.iterate.ConcatResultIterator; import org.apache.phoenix.iterate.LookAheadResultIterator; +import org.apache.phoenix.iterate.MapReduceParallelScanGrouper; import org.apache.phoenix.iterate.PeekingResultIterator; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.iterate.RoundRobinResultIterator; @@ -50,6 +51,7 @@ import org.apache.phoenix.monitoring.ReadMetricQueue; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; + import org.apache.phoenix.query.ConnectionQueryServices; /** @@ -120,8 +122,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null for (Scan scan : scans) { // For MR, skip the region boundary check exception if we encounter a split. ref: PHOENIX-2599 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true)); - final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext().getConnection().getMutationState(), - queryPlan.getTableRef(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold); + final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext().getConnection().getMutationState(), scan, readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold, queryPlan, MapReduceParallelScanGrouper.getInstance()); PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator); iterators.add(peekingResultIterator); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index 46589b9..3b37c69 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -20,6 +20,9 @@ package org.apache.phoenix.util; import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY; import static org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX; import java.io.IOException; import java.sql.SQLException; @@ -32,6 +35,7 @@ import java.util.NavigableSet; import java.util.TreeMap; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; @@ -101,6 +105,10 @@ public class ScanUtil { return scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX) != null; } + public static boolean isNonAggregateScan(Scan scan) { + return scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null; + } + // Use getTenantId and pass in column name to match against // in as PSchema attribute. If column name matches in // KeyExpressions, set on scan as attribute @@ -616,6 +624,62 @@ public class ScanUtil { } } + /** + * prefix region start key to the start row/stop row suffix and set as scan boundaries. + * @param scan + * @param lowerInclusiveRegionKey + * @param upperExclusiveRegionKey + */ + public static void setupLocalIndexScan(Scan scan, byte[] lowerInclusiveRegionKey, + byte[] upperExclusiveRegionKey) { + byte[] prefix = lowerInclusiveRegionKey.length == 0 ? new byte[upperExclusiveRegionKey.length]: lowerInclusiveRegionKey; + int prefixLength = lowerInclusiveRegionKey.length == 0? upperExclusiveRegionKey.length: lowerInclusiveRegionKey.length; + if(scan.getAttribute(SCAN_START_ROW_SUFFIX)!=null) { + scan.setStartRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_START_ROW_SUFFIX), 0, prefix, prefixLength)); + } + if(scan.getAttribute(SCAN_STOP_ROW_SUFFIX)!=null) { + scan.setStopRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_STOP_ROW_SUFFIX), 0, prefix, prefixLength)); + } + } + + public static byte[] getActualStartRow(Scan localIndexScan, HRegionInfo regionInfo) { + return localIndexScan.getAttribute(SCAN_START_ROW_SUFFIX) == null ? localIndexScan + .getStartRow() : ScanRanges.prefixKey(localIndexScan.getAttribute(SCAN_START_ROW_SUFFIX), 0 , + regionInfo.getStartKey().length == 0 ? new byte[regionInfo.getEndKey().length] + : regionInfo.getStartKey(), + regionInfo.getStartKey().length == 0 ? regionInfo.getEndKey().length : regionInfo + .getStartKey().length); + } + + /** + * Set all attributes required and boundaries for local index scan. + * @param keyOffset + * @param regionStartKey + * @param regionEndKey + * @param newScan + */ + public static void setLocalIndexAttributes(Scan newScan, int keyOffset, byte[] regionStartKey, byte[] regionEndKey, byte[] startRowSuffix, byte[] stopRowSuffix) { + if(ScanUtil.isLocalIndex(newScan)) { + newScan.setAttribute(SCAN_ACTUAL_START_ROW, regionStartKey); + newScan.setStartRow(regionStartKey); + newScan.setStopRow(regionEndKey); + if (keyOffset > 0 ) { + newScan.setAttribute(SCAN_START_ROW_SUFFIX, ScanRanges.stripPrefix(startRowSuffix, keyOffset)); + } else { + newScan.setAttribute(SCAN_START_ROW_SUFFIX, startRowSuffix); + } + if (keyOffset > 0) { + newScan.setAttribute(SCAN_STOP_ROW_SUFFIX, ScanRanges.stripPrefix(stopRowSuffix, keyOffset)); + } else { + newScan.setAttribute(SCAN_STOP_ROW_SUFFIX, stopRowSuffix); + } + } + } + + public static boolean isConextScan(Scan scan, StatementContext context) { + return Bytes.compareTo(context.getScan().getStartRow(), scan.getStartRow()) == 0 && Bytes + .compareTo(context.getScan().getStopRow(), scan.getStopRow()) == 0; + } public static int getRowKeyOffset(byte[] regionStartKey, byte[] regionEndKey) { return regionStartKey.length > 0 ? regionStartKey.length : regionEndKey.length; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 73aa4db..bbe8422 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -752,7 +752,7 @@ public abstract class BaseTest { conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, IndexLoadBalancer.class, LoadBalancer.class); conf.setClass("hbase.coprocessor.regionserver.classes", LocalIndexMerger.class, - RegionServerObserver.class); + RegionServerObserver.class) ; conf.setInt("dfs.namenode.handler.count", 2); conf.setInt("dfs.namenode.service.handler.count", 2); conf.setInt("dfs.datanode.handler.count", 2); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java index 47101b2..1da68ba 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java @@ -386,7 +386,12 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException { return ResultIterator.EMPTY_ITERATOR; } - + + @Override + public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { + return ResultIterator.EMPTY_ITERATOR; + } + @Override public ResultIterator iterator() throws SQLException { return ResultIterator.EMPTY_ITERATOR; @@ -467,7 +472,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { return false; } - }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices())); + }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan()); List<KeyRange> keyRanges = parallelIterators.getSplits(); return keyRanges; }
