HIVE-11382 Invalidate aggregate column stats on alter partition (gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7e7f461b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7e7f461b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7e7f461b Branch: refs/heads/llap Commit: 7e7f461b0ba86e40224564e0ad1e320c4f6d62b3 Parents: 9d3d4eb Author: Alan Gates <ga...@hortonworks.com> Authored: Thu Jul 30 10:12:35 2015 -0700 Committer: Alan Gates <ga...@hortonworks.com> Committed: Thu Jul 30 10:12:35 2015 -0700 ---------------------------------------------------------------------- .../TestHBaseAggrStatsCacheIntegration.java | 192 +++++++++++++++++++ .../hadoop/hive/metastore/hbase/HBaseStore.java | 7 + 2 files changed, 199 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7e7f461b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java index 7e6a2ef..ad76b2e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggrStatsCacheIntegration.java @@ -496,4 +496,196 @@ public class TestHBaseAggrStatsCacheIntegration extends HBaseIntegrationTests { store.backdoor().getStatsCache().wakeInvalidator(); } } + + @Test + public void alterInvalidation() throws Exception { + try { + String dbName = "default"; + String tableName = "ai"; + List<String> partVals1 = Arrays.asList("today"); + List<String> partVals2 = Arrays.asList("yesterday"); + List<String> partVals3 = Arrays.asList("tomorrow"); + long now = System.currentTimeMillis(); + + List<FieldSchema> cols = new ArrayList<>(); + cols.add(new FieldSchema("col1", "boolean", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections.<String, String>emptyMap()); + List<FieldSchema> partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections.<String, String>emptyMap(), null, null, null); + store.createTable(table); + + Partition[] partitions = new Partition[3]; + int partnum = 0; + for (List<String> partVals : Arrays.asList(partVals1, partVals2, partVals3)) { + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/invalidation/ds=" + partVals.get(0)); + Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd, + Collections.<String, String>emptyMap()); + partitions[partnum++] = part; + store.addPartition(part); + + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVals.get(0)); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col1"); + obj.setColType("boolean"); + ColumnStatisticsData data = new ColumnStatisticsData(); + BooleanColumnStatsData bcsd = new BooleanColumnStatsData(); + bcsd.setNumFalses(10); + bcsd.setNumTrues(20); + bcsd.setNumNulls(30); + data.setBooleanStats(bcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + + store.updatePartitionColumnStatistics(cs, partVals); + } + + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=today", "ds=tomorrow"), Arrays.asList("col1")); + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1")); + + // Check that we had to build it from the stats + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt()); + + // wake the invalidator and check again to make sure it isn't too aggressive about + // removing our stuff. + store.backdoor().getStatsCache().wakeInvalidator(); + + Partition newPart = new Partition(partitions[2]); + newPart.setLastAccessTime((int)System.currentTimeMillis()); + store.alterPartition(dbName, tableName, partVals3, newPart); + + store.backdoor().getStatsCache().setRunInvalidatorEvery(100); + store.backdoor().getStatsCache().wakeInvalidator(); + + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1")); + + // Check that we missed, which means this aggregate was dropped from the cache. + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(3, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(3, store.backdoor().getStatsCache().misses.getCnt()); + + // Check that our other aggregate is still in the cache. + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1")); + + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(4, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(3, store.backdoor().getStatsCache().misses.getCnt()); + } finally { + store.backdoor().getStatsCache().setRunInvalidatorEvery(5000); + store.backdoor().getStatsCache().setMaxTimeInCache(500000); + store.backdoor().getStatsCache().wakeInvalidator(); + } + } + + @Test + public void altersInvalidation() throws Exception { + try { + String dbName = "default"; + String tableName = "asi"; + List<String> partVals1 = Arrays.asList("today"); + List<String> partVals2 = Arrays.asList("yesterday"); + List<String> partVals3 = Arrays.asList("tomorrow"); + long now = System.currentTimeMillis(); + + List<FieldSchema> cols = new ArrayList<>(); + cols.add(new FieldSchema("col1", "boolean", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections.<String, String>emptyMap()); + List<FieldSchema> partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int) now, (int) now, 0, sd, partCols, + Collections.<String, String>emptyMap(), null, null, null); + store.createTable(table); + + Partition[] partitions = new Partition[3]; + int partnum = 0; + for (List<String> partVals : Arrays.asList(partVals1, partVals2, partVals3)) { + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/invalidation/ds=" + partVals.get(0)); + Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd, + Collections.<String, String>emptyMap()); + partitions[partnum++] = part; + store.addPartition(part); + + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVals.get(0)); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col1"); + obj.setColType("boolean"); + ColumnStatisticsData data = new ColumnStatisticsData(); + BooleanColumnStatsData bcsd = new BooleanColumnStatsData(); + bcsd.setNumFalses(10); + bcsd.setNumTrues(20); + bcsd.setNumNulls(30); + data.setBooleanStats(bcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + + store.updatePartitionColumnStatistics(cs, partVals); + } + + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=today", "ds=tomorrow"), Arrays.asList("col1")); + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1")); + + // Check that we had to build it from the stats + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().misses.getCnt()); + + // wake the invalidator and check again to make sure it isn't too aggressive about + // removing our stuff. + store.backdoor().getStatsCache().wakeInvalidator(); + + Partition[] newParts = new Partition[2]; + newParts[0] = new Partition(partitions[0]); + newParts[0].setLastAccessTime((int)System.currentTimeMillis()); + newParts[1] = new Partition(partitions[2]); + newParts[1].setLastAccessTime((int) System.currentTimeMillis()); + store.alterPartitions(dbName, tableName, Arrays.asList(partVals1, partVals3), + Arrays.asList(newParts)); + + store.backdoor().getStatsCache().setRunInvalidatorEvery(100); + store.backdoor().getStatsCache().wakeInvalidator(); + + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=tomorrow", "ds=today"), Arrays.asList("col1")); + + // Check that we missed, which means this aggregate was dropped from the cache. + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(3, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(3, store.backdoor().getStatsCache().misses.getCnt()); + + // Check that our other aggregate got dropped too + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1")); + + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(4, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(4, store.backdoor().getStatsCache().misses.getCnt()); + } finally { + store.backdoor().getStatsCache().setRunInvalidatorEvery(5000); + store.backdoor().getStatsCache().setMaxTimeInCache(500000); + store.backdoor().getStatsCache().wakeInvalidator(); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/7e7f461b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 744070d..f8042fc 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -522,6 +522,9 @@ public class HBaseStore implements RawStore { try { Partition oldPart = getHBase().getPartition(db_name, tbl_name, part_vals); getHBase().replacePartition(oldPart, new_part); + // Drop any cached stats that reference this partitions + getHBase().getStatsCache().invalidate(db_name, tbl_name, + buildExternalPartName(db_name, tbl_name, part_vals)); commit = true; } catch (IOException e) { LOG.error("Unable to add partition", e); @@ -540,6 +543,10 @@ public class HBaseStore implements RawStore { try { List<Partition> oldParts = getHBase().getPartitions(db_name, tbl_name, part_vals_list); getHBase().replacePartitions(oldParts, new_parts); + for (List<String> part_vals : part_vals_list) { + getHBase().getStatsCache().invalidate(db_name, tbl_name, + buildExternalPartName(db_name, tbl_name, part_vals)); + } commit = true; } catch (IOException e) { LOG.error("Unable to add partition", e);