HIVE-10530: Aggregate stats cache: bug fixes for RDBMS path (Vaibhav Gumashta reviewed by Mostafa Mokhtar, Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4a0ccd11 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4a0ccd11 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4a0ccd11 Branch: refs/heads/beeline-cli Commit: 4a0ccd11f56f4e47b76eae4e60668e78bedfc20b Parents: 3633db2 Author: Vaibhav Gumashta <vgumas...@apache.org> Authored: Thu May 7 13:58:34 2015 -0700 Committer: Vaibhav Gumashta <vgumas...@apache.org> Committed: Thu May 7 13:58:34 2015 -0700 ---------------------------------------------------------------------- .../hive/metastore/AggregateStatsCache.java | 33 +++++++++----------- .../hive/metastore/MetaStoreDirectSql.java | 24 +++++++++----- .../test/queries/clientpositive/explainuser_2.q | 1 + .../extrapolate_part_stats_partial.q | 2 ++ .../extrapolate_part_stats_partial_ndv.q | 2 ++ .../queries/clientpositive/mapjoin_mapjoin.q | 1 + 6 files changed, 37 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4a0ccd11/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java index 6a85936..44106f5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java @@ -55,7 +55,7 @@ public class AggregateStatsCache { // Run the cleaner thread until cache is cleanUntil% occupied private final float cleanUntil; // Nodes go stale after this - private final long timeToLive; + private final long timeToLiveMs; // Max time when waiting for write locks on node list private final long maxWriterWaitTime; // Max time when waiting for read locks on node list @@ -73,12 +73,12 @@ public class AggregateStatsCache { // To track cleaner metrics int numRemovedTTL = 0, numRemovedLRU = 0; - private AggregateStatsCache(int maxCacheNodes, int maxPartsPerCacheNode, long timeToLive, + private AggregateStatsCache(int maxCacheNodes, int maxPartsPerCacheNode, long timeToLiveMs, float falsePositiveProbability, float maxVariance, long maxWriterWaitTime, long maxReaderWaitTime, float maxFull, float cleanUntil) { this.maxCacheNodes = maxCacheNodes; this.maxPartsPerCacheNode = maxPartsPerCacheNode; - this.timeToLive = timeToLive; + this.timeToLiveMs = timeToLiveMs; this.falsePositiveProbability = falsePositiveProbability; this.maxVariance = maxVariance; this.maxWriterWaitTime = maxWriterWaitTime; @@ -97,9 +97,9 @@ public class AggregateStatsCache { int maxPartitionsPerCacheNode = HiveConf .getIntVar(conf, HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_PARTITIONS); - long timeToLive = + long timeToLiveMs = HiveConf.getTimeVar(conf, HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL, - TimeUnit.SECONDS); + TimeUnit.SECONDS)*1000; // False positives probability we are ready to tolerate for the underlying bloom filter float falsePositiveProbability = HiveConf.getFloatVar(conf, HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_FPP); @@ -120,7 +120,7 @@ public class AggregateStatsCache { float cleanUntil = HiveConf.getFloatVar(conf, HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_CLEAN_UNTIL); self = - new AggregateStatsCache(maxCacheNodes, maxPartitionsPerCacheNode, timeToLive, + new AggregateStatsCache(maxCacheNodes, maxPartitionsPerCacheNode, timeToLiveMs, falsePositiveProbability, maxVariance, maxWriterWaitTime, maxReaderWaitTime, maxFull, cleanUntil); } @@ -213,7 +213,7 @@ public class AggregateStatsCache { * @return best matched node or null */ private AggrColStats findBestMatch(List<String> partNames, List<AggrColStats> candidates) { - // Hits, misses, shouldSkip for a node + // Hits, misses tracked for a candidate node MatchStats matchStats; // MatchStats for each candidate Map<AggrColStats, MatchStats> candidateMatchStats = new HashMap<AggrColStats, MatchStats>(); @@ -227,26 +227,23 @@ public class AggregateStatsCache { // Note: we're not creating a copy of the list for saving memory for (AggrColStats candidate : candidates) { // Variance check - if ((float) Math.abs((candidate.getNumPartsCached() - numPartsRequested) - / numPartsRequested) > maxVariance) { + if ((float) Math.abs((candidate.getNumPartsCached() - numPartsRequested) / numPartsRequested) + > maxVariance) { continue; } // TTL check if (isExpired(candidate)) { continue; - } - else { + } else { candidateMatchStats.put(candidate, new MatchStats(0, 0)); } } // We'll count misses as we iterate int maxMisses = (int) maxVariance * numPartsRequested; for (String partName : partNames) { - for (AggrColStats candidate : candidates) { - matchStats = candidateMatchStats.get(candidate); - if (matchStats == null) { - continue; - } + for (Map.Entry<AggrColStats, MatchStats> entry : candidateMatchStats.entrySet()) { + AggrColStats candidate = entry.getKey(); + matchStats = entry.getValue(); if (candidate.getBloomFilter().test(partName.getBytes())) { ++matchStats.hits; } else { @@ -464,7 +461,7 @@ public class AggregateStatsCache { } private boolean isExpired(AggrColStats aggrColStats) { - return System.currentTimeMillis() - aggrColStats.lastAccessTime > timeToLive; + return (System.currentTimeMillis() - aggrColStats.lastAccessTime) > timeToLiveMs; } /** @@ -502,7 +499,7 @@ public class AggregateStatsCache { @Override public String toString() { - return "Database: " + dbName + ", Table: " + tblName + ", Column: " + colName; + return "database:" + dbName + ", table:" + tblName + ", column:" + colName; } } http://git-wip-us.apache.org/repos/asf/hive/blob/4a0ccd11/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java index 5ef3b9a..8bee978 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java @@ -1106,24 +1106,23 @@ class MetaStoreDirectSql { if (isAggregateStatsCacheEnabled) { AggrColStats colStatsAggrCached; List<ColumnStatisticsObj> colStatsAggrFromDB; - int maxPartitionsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode(); - float falsePositiveProbability = aggrStatsCache.getFalsePositiveProbability(); + int maxPartsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode(); + float fpp = aggrStatsCache.getFalsePositiveProbability(); int partitionsRequested = partNames.size(); - if (partitionsRequested > maxPartitionsPerCacheNode) { + if (partitionsRequested > maxPartsPerCacheNode) { colStatsList = columnStatisticsObjForPartitions(dbName, tableName, partNames, colNames, partsFound, useDensityFunctionForNDVEstimation); } else { colStatsList = new ArrayList<ColumnStatisticsObj>(); + // Bloom filter for the new node that we will eventually add to the cache + BloomFilter bloomFilter = createPartsBloomFilter(maxPartsPerCacheNode, fpp, partNames); for (String colName : colNames) { // Check the cache first colStatsAggrCached = aggrStatsCache.get(dbName, tableName, colName, partNames); if (colStatsAggrCached != null) { colStatsList.add(colStatsAggrCached.getColStats()); } else { - // Bloom filter for the new node that we will eventually add to the cache - BloomFilter bloomFilter = - new BloomFilter(maxPartitionsPerCacheNode, falsePositiveProbability); List<String> colNamesForDB = new ArrayList<String>(); colNamesForDB.add(colName); // Read aggregated stats for one column @@ -1148,6 +1147,15 @@ class MetaStoreDirectSql { return new AggrStats(colStatsList, partsFound); } + private BloomFilter createPartsBloomFilter(int maxPartsPerCacheNode, float fpp, + List<String> partNames) { + BloomFilter bloomFilter = new BloomFilter(maxPartsPerCacheNode, fpp); + for (String partName : partNames) { + bloomFilter.add(partName.getBytes()); + } + return bloomFilter; + } + private long partsFoundForPartitions(String dbName, String tableName, List<String> partNames, List<String> colNames) throws MetaException { long partsFound = 0; @@ -1174,8 +1182,8 @@ class MetaStoreDirectSql { } private List<ColumnStatisticsObj> columnStatisticsObjForPartitions(String dbName, - String tableName, List<String> partNames, List<String> colNames, long partsFound, boolean useDensityFunctionForNDVEstimation) - throws MetaException { + String tableName, List<String> partNames, List<String> colNames, long partsFound, + boolean useDensityFunctionForNDVEstimation) throws MetaException { // TODO: all the extrapolation logic should be moved out of this class, // only mechanical data retrieval should remain here. String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", " http://git-wip-us.apache.org/repos/asf/hive/blob/4a0ccd11/ql/src/test/queries/clientpositive/explainuser_2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/explainuser_2.q b/ql/src/test/queries/clientpositive/explainuser_2.q index 8e8ac92..6e98fa0 100644 --- a/ql/src/test/queries/clientpositive/explainuser_2.q +++ b/ql/src/test/queries/clientpositive/explainuser_2.q @@ -1,4 +1,5 @@ set hive.explain.user=true; +set hive.metastore.aggregate.stats.cache.enabled=false; CREATE TABLE dest_j1(key STRING, value STRING, val2 STRING) STORED AS TEXTFILE; http://git-wip-us.apache.org/repos/asf/hive/blob/4a0ccd11/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial.q b/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial.q index 8ae9a90..5c062ee 100644 --- a/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial.q +++ b/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial.q @@ -1,6 +1,8 @@ set hive.stats.fetch.column.stats=true; set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; +set hive.metastore.aggregate.stats.cache.enabled=false; + create table if not exists ext_loc ( state string, http://git-wip-us.apache.org/repos/asf/hive/blob/4a0ccd11/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial_ndv.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial_ndv.q b/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial_ndv.q index b7fc4e3..5f0160a 100644 --- a/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial_ndv.q +++ b/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial_ndv.q @@ -2,6 +2,8 @@ set hive.metastore.stats.ndv.densityfunction=true; set hive.stats.fetch.column.stats=true; set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; +set hive.metastore.aggregate.stats.cache.enabled=false; + drop table if exists ext_loc; http://git-wip-us.apache.org/repos/asf/hive/blob/4a0ccd11/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q b/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q index 5bf4ab1..7f66ff2 100644 --- a/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q +++ b/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q @@ -1,6 +1,7 @@ set hive.auto.convert.join=true; set hive.auto.convert.join.noconditionaltask=true; set hive.auto.convert.join.noconditionaltask.size=10000; +set hive.metastore.aggregate.stats.cache.enabled=false; -- Since the inputs are small, it should be automatically converted to mapjoin