PHOENIX-4686 Phoenix stats does not account for server side limit push downs (Abhishek Chouhan)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bebe66c4 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bebe66c4 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bebe66c4 Branch: refs/heads/system-catalog Commit: bebe66c4680a7cc3b09a703f8608f966ad4905f1 Parents: 9168f66 Author: James Taylor <jtay...@salesforce.com> Authored: Thu Apr 26 09:14:52 2018 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Thu Apr 26 09:55:28 2018 -0700 ---------------------------------------------------------------------- .../end2end/ExplainPlanWithStatsEnabledIT.java | 49 ++++++++++++++++++- .../org/apache/phoenix/execute/ScanPlan.java | 4 +- .../phoenix/iterate/BaseResultIterators.java | 50 +++++++++++++++++--- .../phoenix/iterate/ParallelIterators.java | 8 ++++ .../apache/phoenix/iterate/SerialIterators.java | 13 ++--- .../phoenix/schema/stats/StatisticsUtil.java | 6 +++ 6 files changed, 111 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/bebe66c4/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java index 2099f4c..abaa2f6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java @@ -40,6 +40,7 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.schema.stats.StatisticsUtil; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EnvironmentEdge; @@ -123,6 +124,50 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT { } @Test + public void testBytesRowsForPointSelectWithLimitGreaterThanPointLookupSize() throws Exception { + String sql = "SELECT * FROM " + tableA + " where k in (? ,?) limit 4"; + List<Object> binds = Lists.newArrayList(); + binds.add(103); binds.add(104); + try (Connection conn = DriverManager.getConnection(getUrl())) { + Estimate info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 200L, info.estimatedBytes); + assertEquals((Long) 2L, info.estimatedRows); + assertEquals((Long) StatisticsUtil.NOT_STATS_BASED_TS, info.estimateInfoTs); + } + } + + @Test + public void testBytesRowsForSelectWithLimit() throws Exception { + String sql = "SELECT * FROM " + tableA + " where c1.a in (?,?) limit 3"; + String noIndexSQL = "SELECT /*+ NO_INDEX */ * FROM " + tableA + " where c1.a in (?,?) limit 3"; + List<Object> binds = Lists.newArrayList(); + binds.add(1); binds.add(2); + try (Connection conn = DriverManager.getConnection(getUrl())) { + Estimate info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 264L, info.estimatedBytes); + assertEquals((Long) 3L, info.estimatedRows); + assertEquals((Long) StatisticsUtil.NOT_STATS_BASED_TS, info.estimateInfoTs); + + info = getByteRowEstimates(conn, noIndexSQL, binds); + assertEquals((Long) 634L, info.estimatedBytes); + assertEquals((Long) 10L, info.estimatedRows); + assertTrue(info.estimateInfoTs > 0); + } + } + + @Test + public void testBytesRowsForSelectWithLimitIgnored() throws Exception { + String sql = "SELECT * FROM " + tableA + " where (c1.a > c2.b) limit 1"; + List<Object> binds = Lists.newArrayList(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + Estimate info = getByteRowEstimates(conn, sql, binds); + assertEquals((Long) 691L, info.estimatedBytes); + assertEquals((Long) 10L, info.estimatedRows); + assertTrue(info.estimateInfoTs > 0); + } + } + + @Test public void testBytesRowsForSelectWhenKeyInRange() throws Exception { String sql = "SELECT * FROM " + tableB + " where k >= ?"; List<Object> binds = Lists.newArrayList(); @@ -278,7 +323,7 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT { Estimate info = getByteRowEstimates(conn, sql, binds); assertEquals((Long) 200L, info.estimatedBytes); assertEquals((Long) 2L, info.estimatedRows); - assertTrue(info.estimateInfoTs > 0); + assertEquals((Long) StatisticsUtil.NOT_STATS_BASED_TS, info.estimateInfoTs); } } @@ -305,7 +350,7 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT { Estimate info = getByteRowEstimates(conn, sql, binds); assertEquals((Long) 176L, info.estimatedBytes); assertEquals((Long) 2L, info.estimatedRows); - assertTrue(info.estimateInfoTs > 0); + assertEquals((Long) StatisticsUtil.NOT_STATS_BASED_TS, info.estimateInfoTs); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bebe66c4/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index ed145a4..cdb2da5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -68,8 +68,8 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.stats.StatisticsUtil; import org.apache.phoenix.util.CostUtil; -import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -124,7 +124,7 @@ public class ScanPlan extends BaseQueryPlan { if (isSerial) { serialBytesEstimate = estimate.getFirst(); serialRowsEstimate = estimate.getSecond(); - serialEstimateInfoTs = EnvironmentEdgeManager.currentTimeMillis(); + serialEstimateInfoTs = StatisticsUtil.NOT_STATS_BASED_TS; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/bebe66c4/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 682d1ed..aa9a9f5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -76,6 +77,7 @@ import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.filter.BooleanExpressionFilter; import org.apache.phoenix.filter.ColumnProjectionFilter; import org.apache.phoenix.filter.DistinctPrefixFilter; import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter; @@ -170,6 +172,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return plan.getTableRef().getTable(); } + abstract protected boolean isSerial(); + protected boolean useStats() { /* * Don't use guide posts: @@ -180,7 +184,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result if (ScanUtil.isAnalyzeTable(scan)) { return false; } - return true; + return !isSerial(); } private static void initializeScan(QueryPlan plan, Integer perScanLimit, Integer offset, Scan scan) throws SQLException { @@ -1105,10 +1109,25 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } regionIndex++; } - if (scanRanges.isPointLookup()) { - this.estimatedRows = Long.valueOf(scanRanges.getPointLookupCount()); + if (!scans.isEmpty()) { // Add any remaining scans + parallelScans.add(scans); + } + Long pageLimit = getUnfilteredPageLimit(scan); + if (scanRanges.isPointLookup() || pageLimit != null) { + // If run in parallel, the limit is pushed to each parallel scan so must be accounted for in all of them + int parallelFactor = this.isSerial() ? 1 : parallelScans.size(); + if (scanRanges.isPointLookup() && pageLimit != null) { + this.estimatedRows = Long.valueOf(Math.min(scanRanges.getPointLookupCount(), pageLimit * parallelFactor)); + } else if (scanRanges.isPointLookup()) { + this.estimatedRows = Long.valueOf(scanRanges.getPointLookupCount()); + } else { + this.estimatedRows = Long.valueOf(pageLimit) * parallelFactor; + } this.estimatedSize = this.estimatedRows * SchemaUtil.estimateRowSize(table); - this.estimateInfoTimestamp = computeMinTimestamp(gpsAvailableForAllRegions, estimates, fallbackTs); + // Indication to client that the statistics estimates were not + // calculated based on statistics but instead are based on row + // limits from the query. + this.estimateInfoTimestamp = StatisticsUtil.NOT_STATS_BASED_TS; } else if (emptyGuidePost) { // In case of an empty guide post, we estimate the number of rows scanned by // using the estimated row size @@ -1124,9 +1143,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result this.estimatedSize = null; this.estimateInfoTimestamp = null; } - if (!scans.isEmpty()) { // Add any remaining scans - parallelScans.add(scans); - } } finally { if (stream != null) Closeables.closeQuietly(stream); } @@ -1134,6 +1150,26 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return parallelScans; } + /** + * Return row count limit of PageFilter if exists and there is no where + * clause filter. + * @return + */ + private static Long getUnfilteredPageLimit(Scan scan) { + Long pageLimit = null; + Iterator<Filter> filters = ScanUtil.getFilterIterator(scan); + while (filters.hasNext()) { + Filter filter = filters.next(); + if (filter instanceof BooleanExpressionFilter) { + return null; + } + if (filter instanceof PageFilter) { + pageLimit = ((PageFilter)filter).getPageSize(); + } + } + return pageLimit; + } + private static Long computeMinTimestamp(boolean gpsAvailableForAllRegions, GuidePostEstimate estimates, long fallbackTs) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/bebe66c4/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index 3a4b084..41d278d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -69,6 +69,14 @@ public class ParallelIterators extends BaseResultIterators { this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches, dataPlan); } + /** + * No need to use stats when executing serially + */ + @Override + protected boolean isSerial() { + return false; + } + @Override protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, final boolean isReverse, ParallelScanGrouper scanGrouper) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/bebe66c4/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index f94a7c9..c13fcdb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -73,6 +73,11 @@ public class SerialIterators extends BaseResultIterators { } @Override + protected boolean isSerial() { + return true; + } + + @Override protected void submitWork(final List<List<Scan>> nestedScans, List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures, final Queue<PeekingResultIterator> allIterators, int estFlattenedSize, boolean isReverse, final ParallelScanGrouper scanGrouper) { ExecutorService executor = context.getConnection().getQueryServices().getExecutor(); @@ -117,14 +122,6 @@ public class SerialIterators extends BaseResultIterators { } } - /** - * No need to use stats when executing serially - */ - @Override - protected boolean useStats() { - return false; - } - @Override protected String getName() { return NAME; http://git-wip-us.apache.org/repos/asf/phoenix/blob/bebe66c4/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java index 0b9c409..4a758b7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java @@ -47,6 +47,12 @@ import com.google.common.collect.Sets; * Simple utility class for managing multiple key parts of the statistic */ public class StatisticsUtil { + /** + * Indication to client that the statistics estimates were not + * calculated based on statistics but instead are based on row + * limits from the query. + */ + public static final long NOT_STATS_BASED_TS = 0; private static final Set<TableName> DISABLE_STATS = Sets.newHashSetWithExpectedSize(8); // TODO: make this declarative through new DISABLE_STATS column on SYSTEM.CATALOG table.