HIVE-10530: Aggregate stats cache: bug fixes for RDBMS path (Vaibhav Gumashta 
reviewed by Mostafa Mokhtar, Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4a0ccd11
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4a0ccd11
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4a0ccd11

Branch: refs/heads/beeline-cli
Commit: 4a0ccd11f56f4e47b76eae4e60668e78bedfc20b
Parents: 3633db2
Author: Vaibhav Gumashta <vgumas...@apache.org>
Authored: Thu May 7 13:58:34 2015 -0700
Committer: Vaibhav Gumashta <vgumas...@apache.org>
Committed: Thu May 7 13:58:34 2015 -0700

----------------------------------------------------------------------
 .../hive/metastore/AggregateStatsCache.java     | 33 +++++++++-----------
 .../hive/metastore/MetaStoreDirectSql.java      | 24 +++++++++-----
 .../test/queries/clientpositive/explainuser_2.q |  1 +
 .../extrapolate_part_stats_partial.q            |  2 ++
 .../extrapolate_part_stats_partial_ndv.q        |  2 ++
 .../queries/clientpositive/mapjoin_mapjoin.q    |  1 +
 6 files changed, 37 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4a0ccd11/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java
index 6a85936..44106f5 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/AggregateStatsCache.java
@@ -55,7 +55,7 @@ public class AggregateStatsCache {
   // Run the cleaner thread until cache is cleanUntil% occupied
   private final float cleanUntil;
   // Nodes go stale after this
-  private final long timeToLive;
+  private final long timeToLiveMs;
   // Max time when waiting for write locks on node list
   private final long maxWriterWaitTime;
   // Max time when waiting for read locks on node list
@@ -73,12 +73,12 @@ public class AggregateStatsCache {
   // To track cleaner metrics
   int numRemovedTTL = 0, numRemovedLRU = 0;
 
-  private AggregateStatsCache(int maxCacheNodes, int maxPartsPerCacheNode, 
long timeToLive,
+  private AggregateStatsCache(int maxCacheNodes, int maxPartsPerCacheNode, 
long timeToLiveMs,
       float falsePositiveProbability, float maxVariance, long 
maxWriterWaitTime,
       long maxReaderWaitTime, float maxFull, float cleanUntil) {
     this.maxCacheNodes = maxCacheNodes;
     this.maxPartsPerCacheNode = maxPartsPerCacheNode;
-    this.timeToLive = timeToLive;
+    this.timeToLiveMs = timeToLiveMs;
     this.falsePositiveProbability = falsePositiveProbability;
     this.maxVariance = maxVariance;
     this.maxWriterWaitTime = maxWriterWaitTime;
@@ -97,9 +97,9 @@ public class AggregateStatsCache {
       int maxPartitionsPerCacheNode =
           HiveConf
               .getIntVar(conf, 
HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_MAX_PARTITIONS);
-      long timeToLive =
+      long timeToLiveMs =
           HiveConf.getTimeVar(conf, 
HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_TTL,
-              TimeUnit.SECONDS);
+              TimeUnit.SECONDS)*1000;
       // False positives probability we are ready to tolerate for the 
underlying bloom filter
       float falsePositiveProbability =
           HiveConf.getFloatVar(conf, 
HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_FPP);
@@ -120,7 +120,7 @@ public class AggregateStatsCache {
       float cleanUntil =
           HiveConf.getFloatVar(conf, 
HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_CLEAN_UNTIL);
       self =
-          new AggregateStatsCache(maxCacheNodes, maxPartitionsPerCacheNode, 
timeToLive,
+          new AggregateStatsCache(maxCacheNodes, maxPartitionsPerCacheNode, 
timeToLiveMs,
               falsePositiveProbability, maxVariance, maxWriterWaitTime, 
maxReaderWaitTime, maxFull,
               cleanUntil);
     }
@@ -213,7 +213,7 @@ public class AggregateStatsCache {
    * @return best matched node or null
    */
   private AggrColStats findBestMatch(List<String> partNames, 
List<AggrColStats> candidates) {
-    // Hits, misses, shouldSkip for a node
+    // Hits, misses tracked for a candidate node
     MatchStats matchStats;
     // MatchStats for each candidate
     Map<AggrColStats, MatchStats> candidateMatchStats = new 
HashMap<AggrColStats, MatchStats>();
@@ -227,26 +227,23 @@ public class AggregateStatsCache {
     // Note: we're not creating a copy of the list for saving memory
     for (AggrColStats candidate : candidates) {
       // Variance check
-      if ((float) Math.abs((candidate.getNumPartsCached() - numPartsRequested)
-          / numPartsRequested) > maxVariance) {
+      if ((float) Math.abs((candidate.getNumPartsCached() - numPartsRequested) 
/ numPartsRequested)
+          > maxVariance) {
         continue;
       }
       // TTL check
       if (isExpired(candidate)) {
         continue;
-      }
-      else {
+      } else {
         candidateMatchStats.put(candidate, new MatchStats(0, 0));
       }
     }
     // We'll count misses as we iterate
     int maxMisses = (int) maxVariance * numPartsRequested;
     for (String partName : partNames) {
-      for (AggrColStats candidate : candidates) {
-        matchStats = candidateMatchStats.get(candidate);
-        if (matchStats == null) {
-          continue;
-        }
+      for (Map.Entry<AggrColStats, MatchStats> entry : 
candidateMatchStats.entrySet()) {
+        AggrColStats candidate = entry.getKey();
+        matchStats = entry.getValue();
         if (candidate.getBloomFilter().test(partName.getBytes())) {
           ++matchStats.hits;
         } else {
@@ -464,7 +461,7 @@ public class AggregateStatsCache {
   }
 
   private boolean isExpired(AggrColStats aggrColStats) {
-    return System.currentTimeMillis() - aggrColStats.lastAccessTime > 
timeToLive;
+    return (System.currentTimeMillis() - aggrColStats.lastAccessTime) > 
timeToLiveMs;
   }
 
   /**
@@ -502,7 +499,7 @@ public class AggregateStatsCache {
 
     @Override
     public String toString() {
-      return "Database: " + dbName + ", Table: " + tblName + ", Column: " + 
colName;
+      return "database:" + dbName + ", table:" + tblName + ", column:" + 
colName;
     }
 
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/4a0ccd11/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index 5ef3b9a..8bee978 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -1106,24 +1106,23 @@ class MetaStoreDirectSql {
     if (isAggregateStatsCacheEnabled) {
       AggrColStats colStatsAggrCached;
       List<ColumnStatisticsObj> colStatsAggrFromDB;
-      int maxPartitionsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode();
-      float falsePositiveProbability = 
aggrStatsCache.getFalsePositiveProbability();
+      int maxPartsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode();
+      float fpp = aggrStatsCache.getFalsePositiveProbability();
       int partitionsRequested = partNames.size();
-      if (partitionsRequested > maxPartitionsPerCacheNode) {
+      if (partitionsRequested > maxPartsPerCacheNode) {
         colStatsList =
             columnStatisticsObjForPartitions(dbName, tableName, partNames, 
colNames, partsFound,
                 useDensityFunctionForNDVEstimation);
       } else {
         colStatsList = new ArrayList<ColumnStatisticsObj>();
+        // Bloom filter for the new node that we will eventually add to the 
cache
+        BloomFilter bloomFilter = createPartsBloomFilter(maxPartsPerCacheNode, 
fpp, partNames);
         for (String colName : colNames) {
           // Check the cache first
           colStatsAggrCached = aggrStatsCache.get(dbName, tableName, colName, 
partNames);
           if (colStatsAggrCached != null) {
             colStatsList.add(colStatsAggrCached.getColStats());
           } else {
-            // Bloom filter for the new node that we will eventually add to 
the cache
-            BloomFilter bloomFilter =
-                new BloomFilter(maxPartitionsPerCacheNode, 
falsePositiveProbability);
             List<String> colNamesForDB = new ArrayList<String>();
             colNamesForDB.add(colName);
             // Read aggregated stats for one column
@@ -1148,6 +1147,15 @@ class MetaStoreDirectSql {
     return new AggrStats(colStatsList, partsFound);
   }
 
+  private BloomFilter createPartsBloomFilter(int maxPartsPerCacheNode, float 
fpp,
+      List<String> partNames) {
+    BloomFilter bloomFilter = new BloomFilter(maxPartsPerCacheNode, fpp);
+    for (String partName : partNames) {
+      bloomFilter.add(partName.getBytes());
+    }
+    return bloomFilter;
+  }
+
   private long partsFoundForPartitions(String dbName, String tableName,
       List<String> partNames, List<String> colNames) throws MetaException {
     long partsFound = 0;
@@ -1174,8 +1182,8 @@ class MetaStoreDirectSql {
   }
 
   private List<ColumnStatisticsObj> columnStatisticsObjForPartitions(String 
dbName,
-      String tableName, List<String> partNames, List<String> colNames, long 
partsFound, boolean useDensityFunctionForNDVEstimation)
-      throws MetaException {
+      String tableName, List<String> partNames, List<String> colNames, long 
partsFound,
+      boolean useDensityFunctionForNDVEstimation) throws MetaException {
     // TODO: all the extrapolation logic should be moved out of this class,
     // only mechanical data retrieval should remain here.
     String commonPrefix = "select \"COLUMN_NAME\", \"COLUMN_TYPE\", "

http://git-wip-us.apache.org/repos/asf/hive/blob/4a0ccd11/ql/src/test/queries/clientpositive/explainuser_2.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/explainuser_2.q 
b/ql/src/test/queries/clientpositive/explainuser_2.q
index 8e8ac92..6e98fa0 100644
--- a/ql/src/test/queries/clientpositive/explainuser_2.q
+++ b/ql/src/test/queries/clientpositive/explainuser_2.q
@@ -1,4 +1,5 @@
 set hive.explain.user=true;
+set hive.metastore.aggregate.stats.cache.enabled=false;
 
 CREATE TABLE dest_j1(key STRING, value STRING, val2 STRING) STORED AS TEXTFILE;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/4a0ccd11/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial.q
----------------------------------------------------------------------
diff --git 
a/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial.q 
b/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial.q
index 8ae9a90..5c062ee 100644
--- a/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial.q
+++ b/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial.q
@@ -1,6 +1,8 @@
 set hive.stats.fetch.column.stats=true;
 set hive.exec.dynamic.partition=true;
 set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.metastore.aggregate.stats.cache.enabled=false;
+
 
 create table if not exists ext_loc (
   state string,

http://git-wip-us.apache.org/repos/asf/hive/blob/4a0ccd11/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial_ndv.q
----------------------------------------------------------------------
diff --git 
a/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial_ndv.q 
b/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial_ndv.q
index b7fc4e3..5f0160a 100644
--- a/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial_ndv.q
+++ b/ql/src/test/queries/clientpositive/extrapolate_part_stats_partial_ndv.q
@@ -2,6 +2,8 @@ set hive.metastore.stats.ndv.densityfunction=true;
 set hive.stats.fetch.column.stats=true;
 set hive.exec.dynamic.partition=true;
 set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.metastore.aggregate.stats.cache.enabled=false;
+
 
 drop table if exists ext_loc;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/4a0ccd11/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q 
b/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q
index 5bf4ab1..7f66ff2 100644
--- a/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q
+++ b/ql/src/test/queries/clientpositive/mapjoin_mapjoin.q
@@ -1,6 +1,7 @@
 set hive.auto.convert.join=true;
 set hive.auto.convert.join.noconditionaltask=true;
 set hive.auto.convert.join.noconditionaltask.size=10000;
+set hive.metastore.aggregate.stats.cache.enabled=false;
 
 -- Since the inputs are small, it should be automatically converted to mapjoin
 

Reply via email to