HIVE-11294 Use HBase to cache aggregated stats (gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c53c6f45 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c53c6f45 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c53c6f45 Branch: refs/heads/master Commit: c53c6f45988db869d56abe3b1d831ff775f4fa73 Parents: 1a1c0d8 Author: Alan Gates <ga...@hortonworks.com> Authored: Wed Jul 22 11:17:01 2015 -0700 Committer: Alan Gates <ga...@hortonworks.com> Committed: Wed Jul 22 11:17:01 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 51 +- .../apache/hive/common/util/BloomFilter.java | 20 +- .../TestHBaseAggrStatsCacheIntegration.java | 499 +++ .../metastore/hbase/HbaseMetastoreProto.java | 4189 +++++++++++++++++- .../hbase/AggrStatsInvalidatorFilter.java | 121 + .../hadoop/hive/metastore/hbase/Counter.java | 6 + .../hive/metastore/hbase/HBaseReadWrite.java | 316 +- .../hadoop/hive/metastore/hbase/HBaseStore.java | 47 +- .../hadoop/hive/metastore/hbase/HBaseUtils.java | 81 +- .../hadoop/hive/metastore/hbase/StatsCache.java | 326 ++ .../stats/ColumnStatsAggregatorFactory.java | 51 + .../metastore/hbase/hbase_metastore_proto.proto | 30 + .../hbase/TestHBaseAggregateStatsCache.java | 316 ++ .../hive/metastore/hbase/TestHBaseStore.java | 2 +- 14 files changed, 5717 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5eb11c2..c42b030 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -18,25 +18,7 @@ package org.apache.hadoop.hive.conf; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintStream; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import javax.security.auth.login.LoginException; - +import com.google.common.base.Joiner; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,7 +36,23 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; import org.apache.hive.common.HiveCompat; -import com.google.common.base.Joiner; +import javax.security.auth.login.LoginException; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Hive Configuration. @@ -417,6 +415,19 @@ public class HiveConf extends Configuration { METASTORE_HBASE_CONNECTION_CLASS("hive.metastore.hbase.connection.class", "org.apache.hadoop.hive.metastore.hbase.VanillaHBaseConnection", "Class used to connection to HBase"), + METASTORE_HBASE_AGGR_STATS_CACHE_ENTRIES("hive.metastore.hbase.aggr.stats.cache.entries", + 10000, "How many in stats objects to cache in memory"), + METASTORE_HBASE_AGGR_STATS_MEMORY_TTL("hive.metastore.hbase.aggr.stats.memory.ttl", "60s", + new TimeValidator(TimeUnit.SECONDS), + "Number of seconds stats objects live in memory after they are read from HBase."), + METASTORE_HBASE_AGGR_STATS_INVALIDATOR_FREQUENCY( + "hive.metastore.hbase.aggr.stats.invalidator.frequency", "5s", + new TimeValidator(TimeUnit.SECONDS), + "How often the stats cache scans its HBase entries and looks for expired entries"), + METASTORE_HBASE_AGGR_STATS_HBASE_TTL("hive.metastore.hbase.aggr.stats.hbase.ttl", "604800s", + new TimeValidator(TimeUnit.SECONDS), + "Number of seconds stats entries live in HBase cache after they are created. They may be" + + " invalided by updates or partition drops before this. Default is one week."), METASTORETHRIFTCONNECTIONRETRIES("hive.metastore.connect.retries", 3, "Number of retries while opening a connection to metastore"), http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/common/src/java/org/apache/hive/common/util/BloomFilter.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hive/common/util/BloomFilter.java b/common/src/java/org/apache/hive/common/util/BloomFilter.java index 656ba8a..d894241 100644 --- a/common/src/java/org/apache/hive/common/util/BloomFilter.java +++ b/common/src/java/org/apache/hive/common/util/BloomFilter.java @@ -18,9 +18,10 @@ package org.apache.hive.common.util; -import static com.google.common.base.Preconditions.checkArgument; - import java.util.Arrays; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; /** * BloomFilter is a probabilistic data structure for set membership check. BloomFilters are @@ -63,6 +64,21 @@ public class BloomFilter { this.bitSet = new BitSet(numBits); } + /** + * A constructor to support rebuilding the BloomFilter from a serialized representation. + * @param bits + * @param numBits + * @param numFuncs + */ + public BloomFilter(List<Long> bits, int numBits, int numFuncs) { + super(); + long[] copied = new long[bits.size()]; + for (int i = 0; i < bits.size(); i++) copied[i] = bits.get(i); + bitSet = new BitSet(copied); + this.numBits = numBits; + numHashFunctions = numFuncs; + } + static int optimalNumOfHashFunctions(long n, long m) { return Math.max(1, (int) Math.round((double) m / n * Math.log(2))); } http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java new file mode 100644 index 0000000..7e6a2ef --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java @@ -0,0 +1,499 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Integration tests with HBase Mini-cluster for HBaseStore + */ +public class TestHBaseAggrStatsCacheIntegration extends HBaseIntegrationTests { + + private static final Log LOG = LogFactory.getLog(TestHBaseStoreIntegration.class.getName()); + + @Rule public ExpectedException thrown = ExpectedException.none(); + + @BeforeClass + public static void startup() throws Exception { + HBaseIntegrationTests.startMiniCluster(); + } + + @AfterClass + public static void shutdown() throws Exception { + HBaseIntegrationTests.shutdownMiniCluster(); + } + + @Before + public void setup() throws IOException { + setupConnection(); + setupHBaseStore(); + store.backdoor().getStatsCache().resetCounters(); + } + + private static interface Checker { + void checkStats(AggrStats aggrStats) throws Exception; + } + + @Test + public void hit() throws Exception { + String dbName = "default"; + String tableName = "hit"; + List<String> partVals1 = Arrays.asList("today"); + List<String> partVals2 = Arrays.asList("yesterday"); + long now = System.currentTimeMillis(); + + List<FieldSchema> cols = new ArrayList<>(); + cols.add(new FieldSchema("col1", "boolean", "nocomment")); + cols.add(new FieldSchema("col2", "varchar", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections.<String, String>emptyMap()); + List<FieldSchema> partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int)now, (int)now, 0, sd, partCols, + Collections.<String, String>emptyMap(), null, null, null); + store.createTable(table); + + for (List<String> partVals : Arrays.asList(partVals1, partVals2)) { + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVals.get(0)); + Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd, + Collections.<String, String>emptyMap()); + store.addPartition(part); + + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVals.get(0)); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col1"); + obj.setColType("boolean"); + ColumnStatisticsData data = new ColumnStatisticsData(); + BooleanColumnStatsData bcsd = new BooleanColumnStatsData(); + bcsd.setNumFalses(10); + bcsd.setNumTrues(20); + bcsd.setNumNulls(30); + data.setBooleanStats(bcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + + obj = new ColumnStatisticsObj(); + obj.setColName("col2"); + obj.setColType("varchar"); + data = new ColumnStatisticsData(); + StringColumnStatsData scsd = new StringColumnStatsData(); + scsd.setAvgColLen(10.3); + scsd.setMaxColLen(2000); + scsd.setNumNulls(3); + scsd.setNumDVs(12342); + data.setStringStats(scsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + + store.updatePartitionColumnStatistics(cs, partVals); + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(4, aggrStats.getPartsFound()); + Assert.assertEquals(2, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col1", cso.getColName()); + Assert.assertEquals("boolean", cso.getColType()); + BooleanColumnStatsData bcsd = cso.getStatsData().getBooleanStats(); + Assert.assertEquals(20, bcsd.getNumFalses()); + Assert.assertEquals(40, bcsd.getNumTrues()); + Assert.assertEquals(60, bcsd.getNumNulls()); + + cso = aggrStats.getColStats().get(1); + Assert.assertEquals("col2", cso.getColName()); + Assert.assertEquals("string", cso.getColType()); + StringColumnStatsData scsd = cso.getStatsData().getStringStats(); + Assert.assertEquals(10.3, scsd.getAvgColLen(), 0.1); + Assert.assertEquals(2000, scsd.getMaxColLen()); + Assert.assertEquals(6, scsd.getNumNulls()); + Assert.assertEquals(12342, scsd.getNumDVs()); + } + }; + + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1", "col2")); + statChecker.checkStats(aggrStats); + + // Check that we had to build it from the stats + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt()); + + // Call again, this time it should come from memory. Also, reverse the name order this time + // to assure that we still hit. + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1", "col2")); + statChecker.checkStats(aggrStats); + + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(4, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt()); + + store.backdoor().getStatsCache().flushMemory(); + // Call again, this time it should come from hbase + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1", "col2")); + statChecker.checkStats(aggrStats); + + Assert.assertEquals(2, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(6, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt()); + } + + @Test + public void someWithStats() throws Exception { + String dbName = "default"; + String tableName = "psws"; + List<String> partVals1 = Arrays.asList("today"); + List<String> partVals2 = Arrays.asList("yesterday"); + long now = System.currentTimeMillis(); + + List<FieldSchema> cols = new ArrayList<>(); + cols.add(new FieldSchema("col1", "long", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections.<String, String>emptyMap()); + List<FieldSchema> partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int)now, (int)now, 0, sd, partCols, + Collections.<String, String>emptyMap(), null, null, null); + store.createTable(table); + + boolean first = true; + for (List<String> partVals : Arrays.asList(partVals1, partVals2)) { + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/psws/ds=" + partVals.get(0)); + Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd, + Collections.<String, String>emptyMap()); + store.addPartition(part); + + if (first) { + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVals.get(0)); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col1"); + obj.setColType("long"); + ColumnStatisticsData data = new ColumnStatisticsData(); + LongColumnStatsData lcsd = new LongColumnStatsData(); + lcsd.setHighValue(192L); + lcsd.setLowValue(-20L); + lcsd.setNumNulls(30); + lcsd.setNumDVs(32); + data.setLongStats(lcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + + store.updatePartitionColumnStatistics(cs, partVals); + first = false; + } + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(1, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col1", cso.getColName()); + Assert.assertEquals("long", cso.getColType()); + LongColumnStatsData lcsd = cso.getStatsData().getLongStats(); + Assert.assertEquals(192L, lcsd.getHighValue()); + Assert.assertEquals(-20L, lcsd.getLowValue()); + Assert.assertEquals(30, lcsd.getNumNulls()); + Assert.assertEquals(32, lcsd.getNumDVs()); + } + }; + + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + // Check that we had to build it from the stats + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt()); + + // Call again, this time it should come from memory. Also, reverse the name order this time + // to assure that we still hit. + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt()); + + store.backdoor().getStatsCache().flushMemory(); + // Call again, this time it should come from hbase + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + Assert.assertEquals(1, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(3, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt()); + } + + @Test + public void invalidation() throws Exception { + try { + String dbName = "default"; + String tableName = "invalidation"; + List<String> partVals1 = Arrays.asList("today"); + List<String> partVals2 = Arrays.asList("yesterday"); + List<String> partVals3 = Arrays.asList("tomorrow"); + long now = System.currentTimeMillis(); + + List<FieldSchema> cols = new ArrayList<>(); + cols.add(new FieldSchema("col1", "boolean", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections.<String, String>emptyMap()); + List<FieldSchema> partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections.<String, String>emptyMap(), null, null, null); + store.createTable(table); + + for (List<String> partVals : Arrays.asList(partVals1, partVals2, partVals3)) { + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/invalidation/ds=" + partVals.get(0)); + Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd, + Collections.<String, String>emptyMap()); + store.addPartition(part); + + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVals.get(0)); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col1"); + obj.setColType("boolean"); + ColumnStatisticsData data = new ColumnStatisticsData(); + BooleanColumnStatsData bcsd = new BooleanColumnStatsData(); + bcsd.setNumFalses(10); + bcsd.setNumTrues(20); + bcsd.setNumNulls(30); + data.setBooleanStats(bcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + + store.updatePartitionColumnStatistics(cs, partVals); + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(2, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col1", cso.getColName()); + Assert.assertEquals("boolean", cso.getColType()); + BooleanColumnStatsData bcsd = cso.getStatsData().getBooleanStats(); + Assert.assertEquals(20, bcsd.getNumFalses()); + Assert.assertEquals(40, bcsd.getNumTrues()); + Assert.assertEquals(60, bcsd.getNumNulls()); + } + }; + + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + // Check that we had to build it from the stats + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt()); + + // Call again, this time it should come from memory. Also, reverse the name order this time + // to assure that we still hit. + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt()); + + // Now call a different combination to get it in memory too + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(3, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt()); + + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(4, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt()); + + // wake the invalidator and check again to make sure it isn't too aggressive about + // removing our stuff. + store.backdoor().getStatsCache().wakeInvalidator(); + + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(5, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt()); + + // Update statistics for 'tomorrow' + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVals3.get(0)); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col1"); + obj.setColType("boolean"); + ColumnStatisticsData data = new ColumnStatisticsData(); + BooleanColumnStatsData bcsd = new BooleanColumnStatsData(); + bcsd.setNumFalses(100); + bcsd.setNumTrues(200); + bcsd.setNumNulls(300); + data.setBooleanStats(bcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + + Checker afterUpdate = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(2, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col1", cso.getColName()); + Assert.assertEquals("boolean", cso.getColType()); + BooleanColumnStatsData bcsd = cso.getStatsData().getBooleanStats(); + Assert.assertEquals(110, bcsd.getNumFalses()); + Assert.assertEquals(220, bcsd.getNumTrues()); + Assert.assertEquals(330, bcsd.getNumNulls()); + } + }; + + store.updatePartitionColumnStatistics(cs, partVals3); + + store.backdoor().getStatsCache().setRunInvalidatorEvery(100); + store.backdoor().getStatsCache().wakeInvalidator(); + + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1")); + afterUpdate.checkStats(aggrStats); + + // Check that we missed, which means this aggregate was dropped from the cache. + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(6, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(3, store.backdoor().getStatsCache().misses.getCnt()); + + // Check that our other aggregate is still in the cache. + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(7, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(3, store.backdoor().getStatsCache().misses.getCnt()); + + // Drop 'yesterday', so our first aggregate should be dumped from memory and hbase + store.dropPartition(dbName, tableName, partVals2); + + store.backdoor().getStatsCache().wakeInvalidator(); + + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1")); + new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(1, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col1", cso.getColName()); + Assert.assertEquals("boolean", cso.getColType()); + BooleanColumnStatsData bcsd = cso.getStatsData().getBooleanStats(); + Assert.assertEquals(10, bcsd.getNumFalses()); + Assert.assertEquals(20, bcsd.getNumTrues()); + Assert.assertEquals(30, bcsd.getNumNulls()); + } + }.checkStats(aggrStats); + + // Check that we missed, which means this aggregate was dropped from the cache. + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(8, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(4, store.backdoor().getStatsCache().misses.getCnt()); + + // Check that our other aggregate is still in the cache. + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1")); + afterUpdate.checkStats(aggrStats); + + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(9, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(4, store.backdoor().getStatsCache().misses.getCnt()); + } finally { + store.backdoor().getStatsCache().setRunInvalidatorEvery(5000); + store.backdoor().getStatsCache().setMaxTimeInCache(500000); + store.backdoor().getStatsCache().wakeInvalidator(); + } + } +}