Repository: phoenix Updated Branches: refs/heads/master 8a72032e2 -> a44387358
PHOENIX-2949 Fix estimated region size when checking for serial query Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a4438735 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a4438735 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a4438735 Branch: refs/heads/master Commit: a44387358d6b58b77358a42f38c5baac9e2ab527 Parents: 8a72032 Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Thu Jun 23 13:54:33 2016 -0700 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Thu Jun 23 13:54:33 2016 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/execute/ScanPlan.java | 46 ++++++++++---------- .../org/apache/phoenix/query/QueryServices.java | 2 +- 2 files changed, 25 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/a4438735/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index c55a1cc..0975b3f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -25,7 +25,7 @@ import java.sql.SQLException; import java.util.Collections; import java.util.List; -import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; @@ -62,7 +62,6 @@ import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.PTableStats; -import org.apache.phoenix.schema.stats.StatisticsUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ScanUtil; @@ -118,7 +117,7 @@ public class ScanPlan extends BaseQueryPlan { Scan scan = context.getScan(); /* * If a limit is provided and we have no filter, run the scan serially when we estimate that - * the limit's worth of data will fit into a single region. + * the limit's worth of data is less than the threshold bytes provided in QueryServices.QUERY_PARALLEL_LIMIT_THRESHOLD */ Integer perScanLimit = !allowPageFilter ? null : limit; if (perScanLimit == null || scan.getFilter() != null) { @@ -127,32 +126,35 @@ public class ScanPlan extends BaseQueryPlan { long scn = context.getConnection().getSCN() == null ? Long.MAX_VALUE : context.getConnection().getSCN(); PTableStats tableStats = context.getConnection().getQueryServices().getTableStats(table.getName().getBytes(), scn); GuidePostsInfo gpsInfo = tableStats.getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table)); - long estRowSize = SchemaUtil.estimateRowSize(table); - long estRegionSize; + ConnectionQueryServices services = context.getConnection().getQueryServices(); + long estRowSize; + long estimatedParallelThresholdBytes; if (gpsInfo == null) { - // Use guidepost depth as minimum size - ConnectionQueryServices services = context.getConnection().getQueryServices(); - HTableDescriptor desc = services.getTableDescriptor(table.getPhysicalName().getBytes()); - int guidepostPerRegion = services.getProps().getInt(QueryServices.STATS_GUIDEPOST_PER_REGION_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_PER_REGION); - long guidepostWidth = services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, - QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES); - estRegionSize = StatisticsUtil.getGuidePostDepth(guidepostPerRegion, guidepostWidth, desc); + estRowSize = SchemaUtil.estimateRowSize(table); + estimatedParallelThresholdBytes = services.getProps().getLong(HConstants.HREGION_MAX_FILESIZE, + HConstants.DEFAULT_MAX_FILE_SIZE); } else { - // Region size estimated based on total number of bytes divided by number of regions long totByteSize = 0; + long totRowCount = 0; for (long byteCount : gpsInfo.getByteCounts()) { totByteSize += byteCount; } - estRegionSize = totByteSize / (gpsInfo.getGuidePostsCount()+1); + for (long rowCount : gpsInfo.getRowCounts()) { + totRowCount += rowCount; + } + estRowSize = totByteSize / totRowCount; + estimatedParallelThresholdBytes = 2 + * services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, + QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES); } - // TODO: configurable number of bytes? - boolean isSerial = (perScanLimit * estRowSize < estRegionSize); - - if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("With LIMIT=" + perScanLimit - + ", estimated row size=" + estRowSize - + ", estimated region size=" + estRegionSize + " (" + (gpsInfo == null ? "without " : "with ") + "stats)" - + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution", context.getConnection())); + long limitThreshold = services.getProps().getLong(QueryServices.QUERY_PARALLEL_LIMIT_THRESHOLD, + estimatedParallelThresholdBytes); + boolean isSerial = (perScanLimit * estRowSize < limitThreshold); + + if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations( + "With LIMIT=" + perScanLimit + ", estimated row size=" + estRowSize + ", limitThreshold=" + + limitThreshold + ": " + (isSerial ? "SERIAL" : "PARALLEL") + " execution", + context.getConnection())); return isSerial; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a4438735/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index f5e2a0a..e255e61 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -214,7 +214,7 @@ public interface QueryServices extends SQLCloseable { public static final String HCONNECTION_POOL_CORE_SIZE = "hbase.hconnection.threads.core"; public static final String HCONNECTION_POOL_MAX_SIZE = "hbase.hconnection.threads.max"; public static final String HTABLE_MAX_THREADS = "hbase.htable.threads.max"; - + public static final String QUERY_PARALLEL_LIMIT_THRESHOLD = "phoenix.query.parallelThresholdBytes"; // time to wait before running second index population upsert select (so that any pending batches of rows on region server are also written to index) public static final String INDEX_POPULATION_SLEEP_TIME = "phoenix.index.population.wait.time"; public static final String LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB = "phoenix.client.localIndexUpgrade";