Temporary fix for PHOENIX-2714
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/36ee23ab Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/36ee23ab Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/36ee23ab Branch: refs/heads/calcite Commit: 36ee23ab7760616394f32f88b2da6b038cfee1e9 Parents: 06fd581 Author: maryannxue <maryann....@gmail.com> Authored: Thu Feb 25 15:38:55 2016 -0500 Committer: maryannxue <maryann....@gmail.com> Committed: Thu Feb 25 15:38:55 2016 -0500 ---------------------------------------------------------------------- .../phoenix/calcite/rel/PhoenixTableScan.java | 4 +- .../apache/phoenix/execute/AggregatePlan.java | 2 - .../apache/phoenix/execute/BaseQueryPlan.java | 11 -- .../org/apache/phoenix/execute/ScanPlan.java | 2 - .../phoenix/iterate/BaseResultIterators.java | 154 ++++++++++++++++--- 5 files changed, 137 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java index 80c328f..dfdb507 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java @@ -36,6 +36,7 @@ import org.apache.phoenix.execute.ScanPlan; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.iterate.BaseResultIterators; import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.SelectStatement; @@ -131,8 +132,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); WhereCompiler.setScanFilter(context, select, filterExpr, true, false); scanRanges = context.getScanRanges(); - ScanPlan plan = new ScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null, true, null); - estimatedSize = plan.getEstimatedBytes(); + estimatedSize = BaseResultIterators.getEstimatedCount(context, tableRef.getTable()).getSecond(); } catch (SQLException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index f9eca0c..794247b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -208,8 +208,6 @@ public class AggregatePlan extends BaseQueryPlan { splits = iterators.getSplits(); scans = iterators.getScans(); - estimatedSize = iterators.getEstimatedByteCount(); - estimatedRows = iterators.getEstimatedRowCount(); AggregatingResultIterator aggResultIterator; // No need to merge sort for ungrouped aggregation http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index 0ee70ba..f389fd0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -108,8 +108,6 @@ public abstract class BaseQueryPlan implements QueryPlan { * immediately before creating the ResultIterator. */ protected final Expression dynamicFilter; - protected Long estimatedRows; - protected Long estimatedSize; protected BaseQueryPlan( @@ -129,16 +127,7 @@ public abstract class BaseQueryPlan implements QueryPlan { this.parallelIteratorFactory = parallelIteratorFactory; this.dynamicFilter = dynamicFilter; } - - public Long getEstimatedRowCount() { - return this.estimatedRows; - } - public Long getEstimatedByteCount() { - return this.estimatedSize; - } - - @Override public Operation getOperation() { return Operation.QUERY; http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 72d95be..23bf435 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -208,8 +208,6 @@ public class ScanPlan extends BaseQueryPlan { } splits = iterators.getSplits(); scans = iterators.getScans(); - estimatedSize = iterators.getEstimatedByteCount(); - estimatedRows = iterators.getEstimatedRowCount(); if (isOrdered) { scanner = new MergeSortTopNResultIterator(iterators, limit, orderBy.getOrderByExpressions()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/36ee23ab/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index a48de13..22c0197 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -113,7 +113,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private final List<List<Scan>> scans; private final List<KeyRange> splits; - private final PTableStats tableStats; private final byte[] physicalTableName; private final QueryPlan plan; protected final String scanId; @@ -136,7 +135,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return plan.getTableRef().getTable(); } - private boolean useStats() { + private static boolean useStats(StatementContext context) { Scan scan = context.getScan(); boolean isPointLookup = context.getScanRanges().isPointLookup(); /* @@ -337,7 +336,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result TableRef tableRef = plan.getTableRef(); PTable table = tableRef.getTable(); physicalTableName = table.getPhysicalName().getBytes(); - tableStats = useStats() ? new MetaDataClient(context.getConnection()).getTableStats(table) : PTableStats.EMPTY_STATS; // Used to tie all the scans together during logging scanId = UUID.randomUUID().toString(); @@ -397,18 +395,18 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return guideIndex; } - private GuidePostsInfo getGuidePosts(Set<byte[]> whereConditions) { + private static GuidePostsInfo getGuidePosts(StatementContext context, PTable table, Set<byte[]> whereConditions) throws SQLException { /* * Don't use guide posts if: 1) We're doing a point lookup, as HBase is fast enough at those to not need them to * be further parallelized. TODO: pref test to verify 2) We're collecting stats, as in this case we need to scan * entire regions worth of data to track where to put the guide posts. */ - if (!useStats()) { return GuidePostsInfo.NO_GUIDEPOST; } + if (!useStats(context)) { return GuidePostsInfo.NO_GUIDEPOST; } GuidePostsInfo gps = null; - PTable table = getTable(); + PTableStats tableStats = new MetaDataClient(context.getConnection()).getTableStats(table); Map<byte[], GuidePostsInfo> guidePostMap = tableStats.getGuidePosts(); - byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable()); + byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table); if (table.getColumnFamilies().isEmpty()) { // For sure we can get the defaultCF from the table gps = getDefaultFamilyGuidePosts(guidePostMap, defaultCF); @@ -430,7 +428,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return gps; } - private GuidePostsInfo getDefaultFamilyGuidePosts(Map<byte[], GuidePostsInfo> guidePostMap, byte[] defaultCF) { + private static GuidePostsInfo getDefaultFamilyGuidePosts(Map<byte[], GuidePostsInfo> guidePostMap, byte[] defaultCF) { if (guidePostMap.get(defaultCF) != null) { return guidePostMap.get(defaultCF); } @@ -491,7 +489,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result whereConditions.add(cf); } } - GuidePostsInfo gps = getGuidePosts(whereConditions); + GuidePostsInfo gps = getGuidePosts(context, table, whereConditions); hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST; boolean traverseAllRegions = isSalted || isLocalIndex; if (!traverseAllRegions) { @@ -566,8 +564,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) { Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset, false); - estimatedRows += gps.getRowCounts().get(guideIndex); - estimatedSize += gps.getByteCounts().get(guideIndex); + if (newScan != null) { + estimatedRows += gps.getRowCounts().get(guideIndex); + estimatedSize += gps.getByteCounts().get(guideIndex); + } scans = addNewScan(parallelScans, scans, newScan, currentGuidePostBytes, false, regionLocation); currentKeyBytes = currentGuidePost.copyBytes(); currentGuidePost = PrefixByteCodec.decode(decoder, input); @@ -603,6 +603,130 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return parallelScans; } + + /** + * Compute the estimated count of rows and bytes that will be scanned. + * @return the estimated row count and the byte count. + * @throws SQLException + */ + public static Pair<Long, Long> getEstimatedCount(StatementContext context, PTable table) throws SQLException { + if (table.getName() == null) { // empty table + return new Pair<Long, Long>(null, null); + } + + if (context.getScanRanges().isPointLookup()) { + return new Pair<Long, Long>(1L, SchemaUtil.estimateRowSize(table)); + } + + TreeSet<byte[]> whereConditions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); + for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) { + byte[] cf = where.getFirst(); + if (cf != null) { + whereConditions.add(cf); + } + } + GuidePostsInfo gps = getGuidePosts(context, table, whereConditions); + if (gps == GuidePostsInfo.NO_GUIDEPOST) { + return new Pair<Long, Long>(null, null); + } + + byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY; + byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY; + Scan scan = context.getScan(); + List<HRegionLocation> regionLocations = context.getConnection().getQueryServices() + .getAllTableRegions(table.getPhysicalName().getBytes()); + List<byte[]> regionBoundaries = toBoundaries(regionLocations); + ScanRanges scanRanges = context.getScanRanges(); + boolean isSalted = table.getBucketNum() != null; + boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL; + boolean traverseAllRegions = isSalted || isLocalIndex; + if (!traverseAllRegions) { + byte[] scanStartRow = scan.getStartRow(); + if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) { + startKey = scanStartRow; + } + byte[] scanStopRow = scan.getStopRow(); + if (stopKey.length == 0 + || (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey) < 0)) { + stopKey = scanStopRow; + } + } + + int regionIndex = 0; + int stopIndex = regionBoundaries.size(); + if (startKey.length > 0) { + regionIndex = getIndexContainingInclusive(regionBoundaries, startKey); + } + if (stopKey.length > 0) { + stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey)); + if (isLocalIndex) { + stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey(); + } + } + + ImmutableBytesWritable currentKey = new ImmutableBytesWritable(startKey); + + int gpsSize = gps.getGuidePostsCount(); + int keyOffset = 0; + ImmutableBytesWritable currentGuidePost = ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY; + ImmutableBytesWritable guidePosts = gps.getGuidePosts(); + ByteArrayInputStream stream = null; + DataInput input = null; + PrefixByteDecoder decoder = null; + int guideIndex = 0; + long estimatedRows = 0; + long estimatedSize = 0; + try { + if (gpsSize > 0) { + stream = new ByteArrayInputStream(guidePosts.get(), guidePosts.getOffset(), guidePosts.getLength()); + input = new DataInputStream(stream); + decoder = new PrefixByteDecoder(gps.getMaxLength()); + try { + while (currentKey.compareTo(currentGuidePost = PrefixByteCodec.decode(decoder, input)) >= 0 + && currentKey.getLength() != 0) { + guideIndex++; + } + } catch (EOFException e) {} + } + byte[] currentKeyBytes = currentKey.copyBytes(); + + // Merge bisect with guideposts for all but the last region + while (regionIndex <= stopIndex) { + byte[] currentGuidePostBytes = currentGuidePost.copyBytes(); + byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY; + if (regionIndex == stopIndex) { + endKey = stopKey; + } else { + endKey = regionBoundaries.get(regionIndex); + } + HRegionLocation regionLocation = regionLocations.get(regionIndex); + if (isLocalIndex) { + HRegionInfo regionInfo = regionLocation.getRegionInfo(); + endRegionKey = regionInfo.getEndKey(); + keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey); + } + try { + while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) { + Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset, + false); + if (newScan != null) { + estimatedRows += gps.getRowCounts().get(guideIndex); + estimatedSize += gps.getByteCounts().get(guideIndex); + } + currentKeyBytes = currentGuidePost.copyBytes(); + currentGuidePost = PrefixByteCodec.decode(decoder, input); + currentGuidePostBytes = currentGuidePost.copyBytes(); + guideIndex++; + } + } catch (EOFException e) {} + currentKeyBytes = endKey; + regionIndex++; + } + } finally { + if (stream != null) Closeables.closeQuietly(stream); + } + return new Pair<Long, Long>(estimatedRows, estimatedSize); + } public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) { if (!reverse) { @@ -884,14 +1008,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result buf.append(getName()).append(" ").append(size()).append("-WAY "); explain(buf.toString(),planSteps); } - - public Long getEstimatedRowCount() { - return this.estimatedRows; - } - - public Long getEstimatedByteCount() { - return this.estimatedSize; - } @Override public String toString() {