Repository: hive Updated Branches: refs/heads/master 514ab795f -> 523830338
http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggrStatsInvalidatorFilter.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggrStatsInvalidatorFilter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggrStatsInvalidatorFilter.java new file mode 100644 index 0000000..89c3e7b --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/AggrStatsInvalidatorFilter.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hive.common.util.BloomFilter; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +/** + * Filter for scanning aggregates stats table + */ +public class AggrStatsInvalidatorFilter extends FilterBase { + private static final Log LOG = + LogFactory.getLog(AggrStatsInvalidatorFilter.class.getName()); + private final List<HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry> entries; + private final long runEvery; + private final long maxCacheEntryLife; + // This class is not serializable, so I realize transient doesn't mean anything. It's just to + // comunicate that we don't serialize this and ship it across to the filter on the other end. + // We use the time the filter is actually instantiated in HBase. + private transient long now; + + public static Filter parseFrom(byte[] serialized) throws DeserializationException { + try { + return new AggrStatsInvalidatorFilter( + HbaseMetastoreProto.AggrStatsInvalidatorFilter.parseFrom(serialized)); + } catch (InvalidProtocolBufferException e) { + throw new DeserializationException(e); + } + } + + /** + * @param proto Protocol buffer representation of this filter. + */ + AggrStatsInvalidatorFilter(HbaseMetastoreProto.AggrStatsInvalidatorFilter proto) { + this.entries = proto.getToInvalidateList(); + this.runEvery = proto.getRunEvery(); + this.maxCacheEntryLife = proto.getMaxCacheEntryLife(); + now = System.currentTimeMillis(); + } + + @Override + public byte[] toByteArray() throws IOException { + return HbaseMetastoreProto.AggrStatsInvalidatorFilter.newBuilder() + .addAllToInvalidate(entries) + .setRunEvery(runEvery) + .setMaxCacheEntryLife(maxCacheEntryLife) + .build() + .toByteArray(); + } + + @Override + public boolean filterAllRemaining() throws IOException { + return false; + } + + @Override + public ReturnCode filterKeyValue(Cell cell) throws IOException { + // Is this the partition we want? + if (Arrays.equals(CellUtil.cloneQualifier(cell), HBaseReadWrite.AGGR_STATS_BLOOM_COL)) { + HbaseMetastoreProto.AggrStatsBloomFilter fromCol = + HbaseMetastoreProto.AggrStatsBloomFilter.parseFrom(CellUtil.cloneValue(cell)); + BloomFilter bloom = null; + if (now - maxCacheEntryLife > fromCol.getAggregatedAt()) { + // It's too old, kill it regardless of whether we were asked to or not. + return ReturnCode.INCLUDE; + } else if (now - runEvery * 2 <= fromCol.getAggregatedAt()) { + // It's too new. We might be stomping on something that was just created. Skip it. + return ReturnCode.NEXT_ROW; + } else { + // Look through each of our entries and see if any of them match. + for (HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry entry : entries) { + // First check if we match on db and table match + if (entry.getDbName().equals(fromCol.getDbName()) && + entry.getTableName().equals(fromCol.getTableName())) { + if (bloom == null) { + // Now, reconstitute the bloom filter and probe it with each of our partition names + bloom = new BloomFilter( + fromCol.getBloomFilter().getBitsList(), + fromCol.getBloomFilter().getNumBits(), + fromCol.getBloomFilter().getNumFuncs()); + } + if (bloom.test(entry.getPartName().toByteArray())) { + // This is most likely a match, so mark it and quit looking. + return ReturnCode.INCLUDE; + } + } + } + } + return ReturnCode.NEXT_ROW; + } else { + return ReturnCode.NEXT_COL; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java index 6171fab..2359939 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/Counter.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hive.metastore.hbase; +import com.google.common.annotations.VisibleForTesting; + /** * A simple metric to count how many times something occurs. */ @@ -44,4 +46,8 @@ class Counter { return bldr.toString(); } + @VisibleForTesting long getCnt() { + return cnt; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index fd6f9f5..332e30a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -37,9 +37,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.RegexStringComparator; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hive.common.ObjectPair; -import org.apache.hive.common.util.BloomFilter; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.AggregateStatsCache; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; @@ -53,8 +51,7 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.Role; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.hbase.stats.ColumnStatsAggregator; -import org.apache.hadoop.hive.metastore.hbase.stats.ColumnStatsAggregatorFactory; +import org.apache.hive.common.util.BloomFilter; import java.io.IOException; import java.security.MessageDigest; @@ -62,6 +59,7 @@ import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -75,6 +73,7 @@ import java.util.Set; */ class HBaseReadWrite { + @VisibleForTesting final static String AGGR_STATS_TABLE = "HBMS_AGGR_STATS"; @VisibleForTesting final static String DB_TABLE = "HBMS_DBS"; @VisibleForTesting final static String FUNC_TABLE = "HBMS_FUNCS"; @VisibleForTesting final static String GLOBAL_PRIVS_TABLE = "HBMS_GLOBAL_PRIVS"; @@ -89,12 +88,14 @@ class HBaseReadWrite { /** * List of tables in HBase */ - final static String[] tableNames = { DB_TABLE, FUNC_TABLE, GLOBAL_PRIVS_TABLE, PART_TABLE, - USER_TO_ROLE_TABLE, ROLE_TABLE, SD_TABLE, TABLE_TABLE }; + final static String[] tableNames = { AGGR_STATS_TABLE, DB_TABLE, FUNC_TABLE, GLOBAL_PRIVS_TABLE, + PART_TABLE, USER_TO_ROLE_TABLE, ROLE_TABLE, SD_TABLE, + TABLE_TABLE }; final static Map<String, List<byte[]>> columnFamilies = new HashMap<String, List<byte[]>> (tableNames.length); static { + columnFamilies.put(AGGR_STATS_TABLE, Arrays.asList(CATALOG_CF)); columnFamilies.put(DB_TABLE, Arrays.asList(CATALOG_CF)); columnFamilies.put(FUNC_TABLE, Arrays.asList(CATALOG_CF)); columnFamilies.put(GLOBAL_PRIVS_TABLE, Arrays.asList(CATALOG_CF)); @@ -105,11 +106,21 @@ class HBaseReadWrite { columnFamilies.put(TABLE_TABLE, Arrays.asList(CATALOG_CF, STATS_CF)); } - private final static byte[] CATALOG_COL = "cat".getBytes(HBaseUtils.ENCODING); + /** + * Stores the bloom filter for the aggregated stats, to determine what partitions are in this + * aggregate. + */ + final static byte[] AGGR_STATS_BLOOM_COL = "b".getBytes(HBaseUtils.ENCODING); + private final static byte[] CATALOG_COL = "c".getBytes(HBaseUtils.ENCODING); private final static byte[] ROLES_COL = "roles".getBytes(HBaseUtils.ENCODING); private final static byte[] REF_COUNT_COL = "ref".getBytes(HBaseUtils.ENCODING); - private final static byte[] GLOBAL_PRIVS_KEY = "globalprivs".getBytes(HBaseUtils.ENCODING); + private final static byte[] AGGR_STATS_STATS_COL = "s".getBytes(HBaseUtils.ENCODING); + private final static byte[] GLOBAL_PRIVS_KEY = "gp".getBytes(HBaseUtils.ENCODING); private final static int TABLES_TO_CACHE = 10; + // False positives are very bad here because they cause us to invalidate entries we shouldn't. + // Space used and # of hash functions grows in proportion to ln of num bits so a 10x increase + // in accuracy doubles the required space and number of hash functions. + private final static double STATS_BF_ERROR_RATE = 0.001; @VisibleForTesting final static String TEST_CONN = "test_connection"; private static HBaseConnection testConn; @@ -135,7 +146,7 @@ class HBaseReadWrite { private ObjectCache<ObjectPair<String, String>, Table> tableCache; private ObjectCache<ByteArrayWrapper, StorageDescriptor> sdCache; private PartitionCache partCache; - private AggregateStatsCache aggrStatsCache; + private StatsCache statsCache; private Counter tableHits; private Counter tableMisses; private Counter tableOverflows; @@ -239,8 +250,8 @@ class HBaseReadWrite { sdCache = new ObjectCache<ByteArrayWrapper, StorageDescriptor>(sdsCacheSize, sdHits, sdMisses, sdOverflows); partCache = new PartitionCache(totalCatalogObjectsToCache, partHits, partMisses, partOverflows); - aggrStatsCache = AggregateStatsCache.getInstance(conf); } + statsCache = StatsCache.getInstance(conf); roleCache = new HashMap<String, HbaseMetastoreProto.RoleGrantInfoList>(); entireRoleTableInCache = false; } @@ -252,14 +263,6 @@ class HBaseReadWrite { if (self.get().conn.getHBaseTable(name, true) == null) { List<byte[]> families = columnFamilies.get(name); self.get().conn.createHBaseTable(name, families); - /* - List<byte[]> columnFamilies = new ArrayList<byte[]>(); - columnFamilies.add(CATALOG_CF); - if (TABLE_TABLE.equals(name) || PART_TABLE.equals(name)) { - columnFamilies.add(STATS_CF); - } - self.get().conn.createHBaseTable(name, columnFamilies); - */ } } tablesCreated = true; @@ -1465,13 +1468,12 @@ class HBaseReadWrite { * * @param dbName database the table is in * @param tableName table to update statistics for - * @param partName name of the partition, can be null if these are table level statistics. * @param partVals partition values that define partition to update statistics for. If this is * null, then these will be assumed to be table level statistics * @param stats Stats object with stats for one or more columns * @throws IOException */ - void updateStatistics(String dbName, String tableName, String partName, List<String> partVals, + void updateStatistics(String dbName, String tableName, List<String> partVals, ColumnStatistics stats) throws IOException { byte[] key = getStatisticsKey(dbName, tableName, partVals); String hbaseTable = getStatisticsTable(partVals); @@ -1534,171 +1536,154 @@ class HBaseReadWrite { * to translate from partName to partVals * @param colNames column names to fetch stats for. These columns will be fetched for all * requested partitions - * @return list of ColumnStats, one for each partition. The values will be in the same order as - * the partNames list that was passed in + * @return list of ColumnStats, one for each partition for which we found at least one column's + * stats. * @throws IOException */ List<ColumnStatistics> getPartitionStatistics(String dbName, String tblName, List<String> partNames, List<List<String>> partVals, List<String> colNames) throws IOException { - List<ColumnStatistics> statsList = new ArrayList<ColumnStatistics>(partNames.size()); - ColumnStatistics partitionStats; - ColumnStatisticsDesc statsDesc; - byte[][] colKeys = new byte[colNames.size()][]; - List<Get> gets = new ArrayList<Get>(); - // Initialize the list and build the Gets - for (int pOff = 0; pOff < partNames.size(); pOff++) { - // Add an entry for this partition in the stats list - partitionStats = new ColumnStatistics(); - statsDesc = new ColumnStatisticsDesc(); - statsDesc.setIsTblLevel(false); - statsDesc.setDbName(dbName); - statsDesc.setTableName(tblName); - statsDesc.setPartName(partNames.get(pOff)); - partitionStats.setStatsDesc(statsDesc); - statsList.add(partitionStats); - // Build the list of Gets - for (int i = 0; i < colKeys.length; i++) { - colKeys[i] = HBaseUtils.buildKey(colNames.get(i)); - } - byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, partVals.get(pOff)); + List<ColumnStatistics> statsList = new ArrayList<>(partNames.size()); + Map<List<String>, String> valToPartMap = new HashMap<>(partNames.size()); + List<Get> gets = new ArrayList<>(partNames.size() * colNames.size()); + assert partNames.size() == partVals.size(); + + byte[][] colNameBytes = new byte[colNames.size()][]; + for (int i = 0; i < colNames.size(); i++) { + colNameBytes[i] = HBaseUtils.buildKey(colNames.get(i)); + } + + for (int i = 0; i < partNames.size(); i++) { + valToPartMap.put(partVals.get(i), partNames.get(i)); + byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, partVals.get(i)); Get get = new Get(partKey); - for (byte[] colName : colKeys) { + for (byte[] colName : colNameBytes) { get.addColumn(STATS_CF, colName); } gets.add(get); } HTableInterface htab = conn.getHBaseTable(PART_TABLE); - // Get results from HBase Result[] results = htab.get(gets); - // Deserialize the stats objects and add to stats list - for (int pOff = 0; pOff < results.length; pOff++) { - for (int cOff = 0; cOff < colNames.size(); cOff++) { - byte[] serializedColStats = results[pOff].getValue(STATS_CF, colKeys[cOff]); - if (serializedColStats == null) { - // There were no stats for this column, so skip it - continue; + for (int i = 0; i < results.length; i++) { + ColumnStatistics colStats = null; + for (int j = 0; j < colNameBytes.length; j++) { + byte[] serializedColStats = results[i].getValue(STATS_CF, colNameBytes[j]); + if (serializedColStats != null) { + if (colStats == null) { + // We initialize this late so that we don't create extras in the case of + // partitions with no stats + colStats = new ColumnStatistics(); + statsList.add(colStats); + ColumnStatisticsDesc csd = new ColumnStatisticsDesc(); + + // We need to figure out which partition these call stats are from. To do that we + // recontruct the key. We have to pull the dbName and tableName out of the key to + // find the partition values. + byte[] key = results[i].getRow(); + String[] reconstructedKey = HBaseUtils.parseKey(key); + List<String> reconstructedPartVals = + Arrays.asList(reconstructedKey).subList(2, reconstructedKey.length); + String partName = valToPartMap.get(reconstructedPartVals); + assert partName != null; + csd.setIsTblLevel(false); + csd.setDbName(dbName); + csd.setTableName(tblName); + csd.setPartName(partName); + colStats.setStatsDesc(csd); + } + ColumnStatisticsObj cso = + HBaseUtils.deserializeStatsForOneColumn(colStats, serializedColStats); + cso.setColName(colNames.get(j)); + colStats.addToStatsObj(cso); } - partitionStats = statsList.get(pOff); - ColumnStatisticsObj colStats = - HBaseUtils.deserializeStatsForOneColumn(partitionStats, serializedColStats); - colStats.setColName(colNames.get(cOff)); - partitionStats.addToStatsObj(colStats); } } + return statsList; } /** - * Get aggregate stats for a column from the DB and populate the bloom filter if it's not null - * @param dbName - * @param tblName - * @param partNames - * @param partVals - * @param colNames - * @return + * Get a reference to the stats cache. + * @return the stats cache. + */ + StatsCache getStatsCache() { + return statsCache; + } + + /** + * Get aggregated stats. Only intended for use by + * {@link org.apache.hadoop.hive.metastore.hbase.StatsCache}. Others should not call directly + * but should call StatsCache.get instead. + * @param key The md5 hash associated with this partition set + * @return stats if hbase has them, else null * @throws IOException */ - AggrStats getAggrStats(String dbName, String tblName, List<String> partNames, - List<List<String>> partVals, List<String> colNames) throws IOException { - // One ColumnStatisticsObj per column - List<ColumnStatisticsObj> colStatsList = new ArrayList<ColumnStatisticsObj>(); - AggregateStatsCache.AggrColStats colStatsAggrCached; - ColumnStatisticsObj colStatsAggr; - int maxPartitionsPerCacheNode = aggrStatsCache.getMaxPartsPerCacheNode(); - float falsePositiveProbability = aggrStatsCache.getFalsePositiveProbability(); - int partitionsRequested = partNames.size(); - // TODO: Steal extrapolation logic from current MetaStoreDirectSql code - // Right now doing nothing and keeping partitionsFound == partitionsRequested - int partitionsFound = partitionsRequested; - for (String colName : colNames) { - if (partitionsRequested > maxPartitionsPerCacheNode) { - // Read from HBase but don't add to cache since it doesn't qualify the criteria - colStatsAggr = getAggrStatsFromDB(dbName, tblName, colName, partNames, partVals, null); - colStatsList.add(colStatsAggr); - } else { - // Check the cache first - colStatsAggrCached = aggrStatsCache.get(dbName, tblName, colName, partNames); - if (colStatsAggrCached != null) { - colStatsList.add(colStatsAggrCached.getColStats()); - } else { - // Bloom filter for the new node that we will eventually add to the cache - BloomFilter bloomFilter = - new BloomFilter(maxPartitionsPerCacheNode, falsePositiveProbability); - colStatsAggr = - getAggrStatsFromDB(dbName, tblName, colName, partNames, partVals, bloomFilter); - colStatsList.add(colStatsAggr); - // Update the cache to add this new aggregate node - aggrStatsCache.add(dbName, tblName, colName, partitionsFound, colStatsAggr, bloomFilter); - } - } - } - return new AggrStats(colStatsList, partitionsFound); + AggrStats getAggregatedStats(byte[] key) throws IOException{ + byte[] serialized = read(AGGR_STATS_TABLE, key, CATALOG_CF, AGGR_STATS_STATS_COL); + if (serialized == null) return null; + return HBaseUtils.deserializeAggrStats(serialized); } /** - * - * @param dbName - * @param tblName - * @param partNames - * @param partVals - * @param colName - * @param bloomFilter - * @return + * Put aggregated stats Only intended for use by + * {@link org.apache.hadoop.hive.metastore.hbase.StatsCache}. Others should not call directly + * but should call StatsCache.put instead. + * @param key The md5 hash associated with this partition set + * @param dbName Database these partitions are in + * @param tableName Table these partitions are in + * @param partNames Partition names + * @param colName Column stats are for + * @param stats Stats + * @throws IOException */ - private ColumnStatisticsObj getAggrStatsFromDB(String dbName, String tblName, String colName, - List<String> partNames, List<List<String>> partVals, BloomFilter bloomFilter) + void putAggregatedStats(byte[] key, String dbName, String tableName, List<String> partNames, + String colName, AggrStats stats) throws IOException { + // Serialize the part names + List<String> protoNames = new ArrayList<>(partNames.size() + 3); + protoNames.add(dbName); + protoNames.add(tableName); + protoNames.add(colName); + protoNames.addAll(partNames); + // Build a bloom Filter for these partitions + BloomFilter bloom = new BloomFilter(partNames.size(), STATS_BF_ERROR_RATE); + for (String partName : partNames) { + bloom.add(partName.getBytes(HBaseUtils.ENCODING)); + } + byte[] serializedFilter = HBaseUtils.serializeBloomFilter(dbName, tableName, bloom); + + byte[] serializedStats = HBaseUtils.serializeAggrStats(stats); + store(AGGR_STATS_TABLE, key, CATALOG_CF, + new byte[][]{AGGR_STATS_BLOOM_COL, AGGR_STATS_STATS_COL}, + new byte[][]{serializedFilter, serializedStats}); + } + + // TODO - We shouldn't remove an entry from the cache as soon as a single partition is deleted. + // TODO - Instead we should keep track of how many partitions have been deleted and only remove + // TODO - an entry once it passes a certain threshold, like 5%, of partitions have been removed. + // TODO - That requires moving this from a filter to a co-processor. + /** + * Invalidate stats associated with the listed partitions. This method is intended for use + * only by {@link org.apache.hadoop.hive.metastore.hbase.StatsCache}. + * @param filter serialized version of the filter to pass + * @return List of md5 hash keys for the partition stat sets that were removed. + * @throws IOException + */ + List<StatsCache.StatsCacheKey> + invalidateAggregatedStats(HbaseMetastoreProto.AggrStatsInvalidatorFilter filter) throws IOException { - ColumnStatisticsObj colStatsAggr = new ColumnStatisticsObj(); - boolean colStatsAggrInited = false; - ColumnStatsAggregator colStatsAggregator = null; - List<Get> gets = new ArrayList<Get>(); - byte[] colKey = HBaseUtils.buildKey(colName); - // Build a list of Gets, one per partition - for (int pOff = 0; pOff < partNames.size(); pOff++) { - byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, partVals.get(pOff)); - Get get = new Get(partKey); - get.addColumn(STATS_CF, colKey); - gets.add(get); - } - HTableInterface htab = conn.getHBaseTable(PART_TABLE); - // Get results from HBase - Result[] results = htab.get(gets); - // Iterate through the results - // The results size and order is the same as the number and order of the Gets - // If the column is not present in a partition, the Result object will be empty - for (int pOff = 0; pOff < partNames.size(); pOff++) { - if (results[pOff].isEmpty()) { - // There were no stats for this column, so skip it - continue; - } - byte[] serializedColStats = results[pOff].getValue(STATS_CF, colKey); - if (serializedColStats == null) { - // There were no stats for this column, so skip it - continue; - } - ColumnStatisticsObj colStats = - HBaseUtils.deserializeStatsForOneColumn(null, serializedColStats); - if (!colStatsAggrInited) { - // This is the 1st column stats object we got - colStatsAggr.setColName(colName); - colStatsAggr.setColType(colStats.getColType()); - colStatsAggr.setStatsData(colStats.getStatsData()); - colStatsAggregator = - ColumnStatsAggregatorFactory.getColumnStatsAggregator(colStats.getStatsData() - .getSetField()); - colStatsAggrInited = true; - } else { - // Perform aggregation with whatever we've already aggregated - colStatsAggregator.aggregate(colStatsAggr, colStats); - } - // Add partition to the bloom filter if it's requested - if (bloomFilter != null) { - bloomFilter.add(partNames.get(pOff).getBytes()); - } + Iterator<Result> results = scan(AGGR_STATS_TABLE, new AggrStatsInvalidatorFilter(filter)); + if (!results.hasNext()) return Collections.emptyList(); + List<Delete> deletes = new ArrayList<>(); + List<StatsCache.StatsCacheKey> keys = new ArrayList<>(); + while (results.hasNext()) { + Result result = results.next(); + deletes.add(new Delete(result.getRow())); + keys.add(new StatsCache.StatsCacheKey(result.getRow())); } - return colStatsAggr; + HTableInterface htab = conn.getHBaseTable(AGGR_STATS_TABLE); + htab.delete(deletes); + return keys; } private byte[] getStatisticsKey(String dbName, String tableName, List<String> partVals) { @@ -1718,9 +1703,12 @@ class HBaseReadWrite { * This should be called whenever a new query is started. */ void flushCatalogCache() { - for (Counter counter : counters) { - LOG.debug(counter.dump()); - counter.clear(); + if (LOG.isDebugEnabled()) { + for (Counter counter : counters) { + LOG.debug(counter.dump()); + counter.clear(); + } + statsCache.dumpCounters(); } tableCache.flush(); sdCache.flush(); @@ -1794,6 +1782,10 @@ class HBaseReadWrite { return scan(table, null, null, colFam, colName, filter); } + private Iterator<Result> scan(String table, Filter filter) throws IOException { + return scan(table, null, null, null, null, filter); + } + private Iterator<Result> scan(String table, byte[] keyStart, byte[] keyEnd, byte[] colFam, byte[] colName, Filter filter) throws IOException { HTableInterface htab = conn.getHBaseTable(table); @@ -1804,7 +1796,9 @@ class HBaseReadWrite { if (keyEnd != null) { s.setStopRow(keyEnd); } - s.addColumn(colFam, colName); + if (colFam != null && colName != null) { + s.addColumn(colFam, colName); + } if (filter != null) { s.setFilter(filter); } http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 4fa2ae5..9782859 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hive.metastore.hbase; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheLoader; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -370,6 +372,9 @@ public class HBaseStore implements RawStore { openTransaction(); try { getHBase().deletePartition(dbName, tableName, part_vals); + // Drop any cached stats that reference this partitions + getHBase().getStatsCache().invalidate(dbName, tableName, + buildExternalPartName(dbName, tableName, part_vals)); commit = true; return true; } catch (IOException e) { @@ -1472,7 +1477,7 @@ public class HBaseStore implements RawStore { openTransaction(); try { getHBase().updateStatistics(colStats.getStatsDesc().getDbName(), - colStats.getStatsDesc().getTableName(), null, null, colStats); + colStats.getStatsDesc().getTableName(), null, colStats); commit = true; return true; } catch (IOException e) { @@ -1491,8 +1496,10 @@ public class HBaseStore implements RawStore { openTransaction(); try { getHBase().updateStatistics(statsObj.getStatsDesc().getDbName(), - statsObj.getStatsDesc().getTableName(), statsObj.getStatsDesc().getPartName(), - partVals, statsObj); + statsObj.getStatsDesc().getTableName(), partVals, statsObj); + // We need to invalidate aggregates that include this partition + getHBase().getStatsCache().invalidate(statsObj.getStatsDesc().getDbName(), + statsObj.getStatsDesc().getTableName(), statsObj.getStatsDesc().getPartName()); commit = true; return true; } catch (IOException e) { @@ -1528,7 +1535,6 @@ public class HBaseStore implements RawStore { for (String partName : partNames) { partVals.add(partNameToVals(partName)); } - for (String partName : partNames) partVals.add(partNameToVals(partName)); boolean commit = false; openTransaction(); try { @@ -1574,9 +1580,24 @@ public class HBaseStore implements RawStore { boolean commit = false; openTransaction(); try { - AggrStats stats = getHBase().getAggrStats(dbName, tblName, partNames, partVals, colNames); + AggrStats aggrStats = new AggrStats(); + for (String colName : colNames) { + try { + AggrStats oneCol = + getHBase().getStatsCache().get(dbName, tblName, partNames, colName); + if (oneCol.getColStatsSize() > 0) { + assert oneCol.getColStatsSize() == 1; + aggrStats.setPartsFound(aggrStats.getPartsFound() + oneCol.getPartsFound()); + aggrStats.addToColStats(oneCol.getColStats().get(0)); + } + } catch (CacheLoader.InvalidCacheLoadException e) { + LOG.debug("Found no stats for column " + colName); + // This means we have no stats at all for this column for these partitions, so just + // move on. + } + } commit = true; - return stats; + return aggrStats; } catch (IOException e) { LOG.error("Unable to fetch aggregate column statistics", e); throw new MetaException("Failed fetching aggregate column statistics, " + e.getMessage()); @@ -2068,7 +2089,7 @@ public class HBaseStore implements RawStore { return FileUtils.makePartName(partCols, partVals); } - private List<String> partNameToVals(String name) { + private static List<String> partNameToVals(String name) { if (name == null) return null; List<String> vals = new ArrayList<String>(); String[] kvp = name.split("/"); @@ -2078,6 +2099,14 @@ public class HBaseStore implements RawStore { return vals; } + static List<List<String>> partNameListToValsList(List<String> partNames) { + List<List<String>> valLists = new ArrayList<List<String>>(partNames.size()); + for (String partName : partNames) { + valLists.add(partNameToVals(partName)); + } + return valLists; + } + private String likeToRegex(String like) { if (like == null) return null; // Convert Hive's strange like syntax to Java regex. Per @@ -2097,4 +2126,8 @@ public class HBaseStore implements RawStore { rollbackTransaction(); } } + + @VisibleForTesting HBaseReadWrite backdoor() { + return getHBase(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java index 969c979..4d57af2 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java @@ -23,11 +23,11 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Decimal; @@ -50,7 +50,7 @@ import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.thrift.TFieldIdEnum; +import org.apache.hive.common.util.BloomFilter; import java.io.IOException; import java.nio.charset.Charset; @@ -98,6 +98,11 @@ class HBaseUtils { return protoKey.getBytes(ENCODING); } + static String[] parseKey(byte[] serialized) { + String munged = new String(serialized, ENCODING); + return munged.split(KEY_SEPARATOR_STR); + } + private static HbaseMetastoreProto.Parameters buildParameters(Map<String, String> params) { List<HbaseMetastoreProto.ParameterEntry> entries = new ArrayList<HbaseMetastoreProto.ParameterEntry>(); @@ -902,14 +907,40 @@ class HBaseUtils { return sdParts; } - static byte[] serializeStatsForOneColumn(ColumnStatistics partitionColumnStats, ColumnStatisticsObj colStats) + static byte[] serializeBloomFilter(String dbName, String tableName, BloomFilter bloom) { + long[] bitSet = bloom.getBitSet(); + List<Long> bits = new ArrayList<>(bitSet.length); + for (int i = 0; i < bitSet.length; i++) bits.add(bitSet[i]); + HbaseMetastoreProto.AggrStatsBloomFilter.BloomFilter protoBloom = + HbaseMetastoreProto.AggrStatsBloomFilter.BloomFilter.newBuilder() + .setNumBits(bloom.getBitSize()) + .setNumFuncs(bloom.getNumHashFunctions()) + .addAllBits(bits) + .build(); + + HbaseMetastoreProto.AggrStatsBloomFilter proto = + HbaseMetastoreProto.AggrStatsBloomFilter.newBuilder() + .setDbName(ByteString.copyFrom(dbName.getBytes(ENCODING))) + .setTableName(ByteString.copyFrom(tableName.getBytes(ENCODING))) + .setBloomFilter(protoBloom) + .setAggregatedAt(System.currentTimeMillis()) + .build(); + + return proto.toByteArray(); + } + + private static HbaseMetastoreProto.ColumnStats + protoBufStatsForOneColumn(ColumnStatistics partitionColumnStats, ColumnStatisticsObj colStats) throws IOException { HbaseMetastoreProto.ColumnStats.Builder builder = HbaseMetastoreProto.ColumnStats.newBuilder(); - builder.setLastAnalyzed(partitionColumnStats.getStatsDesc().getLastAnalyzed()); - if (colStats.getColType() == null) { - throw new RuntimeException("Column type must be set"); + if (partitionColumnStats != null) { + builder.setLastAnalyzed(partitionColumnStats.getStatsDesc().getLastAnalyzed()); } + assert colStats.getColType() != null; builder.setColumnType(colStats.getColType()); + assert colStats.getColName() != null; + builder.setColumnName(colStats.getColName()); + ColumnStatisticsData colData = colStats.getStatsData(); switch (colData.getSetField()) { case BOOLEAN_STATS: @@ -987,12 +1018,23 @@ class HBaseUtils { default: throw new RuntimeException("Woh, bad. Unknown stats type!"); } - return builder.build().toByteArray(); + return builder.build(); + } + + static byte[] serializeStatsForOneColumn(ColumnStatistics partitionColumnStats, + ColumnStatisticsObj colStats) throws IOException { + return protoBufStatsForOneColumn(partitionColumnStats, colStats).toByteArray(); } static ColumnStatisticsObj deserializeStatsForOneColumn(ColumnStatistics partitionColumnStats, byte[] bytes) throws IOException { HbaseMetastoreProto.ColumnStats proto = HbaseMetastoreProto.ColumnStats.parseFrom(bytes); + return statsForOneColumnFromProtoBuf(partitionColumnStats, proto); + } + + private static ColumnStatisticsObj + statsForOneColumnFromProtoBuf(ColumnStatistics partitionColumnStats, + HbaseMetastoreProto.ColumnStats proto) throws IOException { ColumnStatisticsObj colStats = new ColumnStatisticsObj(); long lastAnalyzed = proto.getLastAnalyzed(); if (partitionColumnStats != null) { @@ -1000,6 +1042,7 @@ class HBaseUtils { Math.max(lastAnalyzed, partitionColumnStats.getStatsDesc().getLastAnalyzed())); } colStats.setColType(proto.getColumnType()); + colStats.setColName(proto.getColumnName()); ColumnStatisticsData colData = new ColumnStatisticsData(); if (proto.hasBoolStats()) { @@ -1067,6 +1110,30 @@ class HBaseUtils { return colStats; } + static byte[] serializeAggrStats(AggrStats aggrStats) throws IOException { + List<HbaseMetastoreProto.ColumnStats> protoColStats = + new ArrayList<>(aggrStats.getColStatsSize()); + for (ColumnStatisticsObj cso : aggrStats.getColStats()) { + protoColStats.add(protoBufStatsForOneColumn(null, cso)); + } + return HbaseMetastoreProto.AggrStats.newBuilder() + .setPartsFound(aggrStats.getPartsFound()) + .addAllColStats(protoColStats) + .build() + .toByteArray(); + } + + static AggrStats deserializeAggrStats(byte[] serialized) throws IOException { + HbaseMetastoreProto.AggrStats protoAggrStats = + HbaseMetastoreProto.AggrStats.parseFrom(serialized); + AggrStats aggrStats = new AggrStats(); + aggrStats.setPartsFound(protoAggrStats.getPartsFound()); + for (HbaseMetastoreProto.ColumnStats protoCS : protoAggrStats.getColStatsList()) { + aggrStats.addToColStats(statsForOneColumnFromProtoBuf(null, protoCS)); + } + return aggrStats; + } + /** * @param keyStart byte array representing the start prefix * @return byte array corresponding to the next possible prefix http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java new file mode 100644 index 0000000..0d3ed40 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/StatsCache.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.protobuf.ByteString; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.hbase.stats.ColumnStatsAggregator; +import org.apache.hadoop.hive.metastore.hbase.stats.ColumnStatsAggregatorFactory; + +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A cache for stats. This is only intended for use by + * {@link org.apache.hadoop.hive.metastore.hbase.HBaseReadWrite} and should not be used outside + * that class. + */ +class StatsCache { + + private static final Log LOG = LogFactory.getLog(StatsCache.class.getName()); + private static StatsCache self = null; + + private LoadingCache<StatsCacheKey, AggrStats> cache; + private Invalidator invalidator; + private long runInvalidatorEvery; + private long maxTimeInCache; + private boolean invalidatorHasRun; + + @VisibleForTesting Counter misses; + @VisibleForTesting Counter hbaseHits; + @VisibleForTesting Counter totalGets; + + static synchronized StatsCache getInstance(Configuration conf) { + if (self == null) { + self = new StatsCache(conf); + } + return self; + } + + private StatsCache(Configuration conf) { + final StatsCache me = this; + cache = CacheBuilder.newBuilder() + .maximumSize( + HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_CACHE_ENTRIES)) + .expireAfterWrite(HiveConf.getTimeVar(conf, + HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_MEMORY_TTL, TimeUnit.SECONDS), TimeUnit.SECONDS) + .build(new CacheLoader<StatsCacheKey, AggrStats>() { + @Override + public AggrStats load(StatsCacheKey key) throws Exception { + HBaseReadWrite hrw = HBaseReadWrite.getInstance(); + AggrStats aggrStats = hrw.getAggregatedStats(key.hashed); + if (aggrStats == null) { + misses.incr(); + ColumnStatsAggregator aggregator = null; + ColumnStatisticsObj statsObj = null; + aggrStats = new AggrStats(); + LOG.debug("Unable to find aggregated stats for " + key.colName + ", aggregating"); + List<ColumnStatistics> css = hrw.getPartitionStatistics(key.dbName, key.tableName, + key.partNames, HBaseStore.partNameListToValsList(key.partNames), + Collections.singletonList(key.colName)); + if (css != null && css.size() > 0) { + aggrStats.setPartsFound(css.size()); + for (ColumnStatistics cs : css) { + for (ColumnStatisticsObj cso : cs.getStatsObj()) { + if (statsObj == null) { + statsObj = ColumnStatsAggregatorFactory.newColumnStaticsObj(key.colName, + cso.getStatsData().getSetField()); + } + if (aggregator == null) { + aggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator( + cso.getStatsData().getSetField()); + } + aggregator.aggregate(statsObj, cso); + } + } + aggrStats.addToColStats(statsObj); + me.put(key, aggrStats); + } + } else { + hbaseHits.incr(); + } + return aggrStats; + } + }); + misses = new Counter("Stats cache table misses"); + hbaseHits = new Counter("Stats cache table hits"); + totalGets = new Counter("Total get calls to the stats cache"); + + maxTimeInCache = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_HBASE_TTL, TimeUnit.SECONDS); + // We want runEvery in milliseconds, even though we give the default value in the conf in + // seconds. + runInvalidatorEvery = HiveConf.getTimeVar(conf, + HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_INVALIDATOR_FREQUENCY, TimeUnit.MILLISECONDS); + + invalidator = new Invalidator(); + invalidator.setDaemon(true); + invalidator.start(); + } + + /** + * Add an object to the cache. + * @param key Key for this entry + * @param aggrStats stats + * @throws java.io.IOException + */ + void put(StatsCacheKey key, AggrStats aggrStats) throws IOException { + HBaseReadWrite.getInstance().putAggregatedStats(key.hashed, key.dbName, key.tableName, + key.partNames, + key.colName, aggrStats); + cache.put(key, aggrStats); + } + + /** + * Get partition level statistics + * @param dbName name of database table is in + * @param tableName name of table + * @param partNames names of the partitions + * @param colName of column to get stats for + * @return stats object for this column, or null if none cached + * @throws java.io.IOException + */ + + AggrStats get(String dbName, String tableName, List<String> partNames, String colName) + throws IOException { + totalGets.incr(); + StatsCacheKey key = new StatsCacheKey(dbName, tableName, partNames, colName); + try { + return cache.get(key); + } catch (ExecutionException e) { + throw new IOException(e); + } + } + + /** + * Remove all entries that are related to a particular set of partitions. This should be + * called when partitions are deleted or stats are updated. + * @param dbName name of database table is in + * @param tableName name of table + * @param partName name of the partition + * @throws IOException + */ + void invalidate(String dbName, String tableName, String partName) + throws IOException { + invalidator.addToQueue( + HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry.newBuilder() + .setDbName(ByteString.copyFrom(dbName.getBytes(HBaseUtils.ENCODING))) + .setTableName(ByteString.copyFrom(tableName.getBytes(HBaseUtils.ENCODING))) + .setPartName(ByteString.copyFrom(partName.getBytes(HBaseUtils.ENCODING))) + .build()); + } + + void dumpCounters() { + LOG.debug(misses.dump()); + LOG.debug(hbaseHits.dump()); + LOG.debug(totalGets.dump()); + } + + /** + * Completely dump the cache from memory, used to test that we can access stats from HBase itself. + * @throws IOException + */ + @VisibleForTesting void flushMemory() throws IOException { + cache.invalidateAll(); + } + + @VisibleForTesting void resetCounters() { + misses.clear(); + hbaseHits.clear(); + totalGets.clear(); + } + + @VisibleForTesting void setRunInvalidatorEvery(long runEvery) { + runInvalidatorEvery = runEvery; + } + + @VisibleForTesting void setMaxTimeInCache(long maxTime) { + maxTimeInCache = maxTime; + } + + @VisibleForTesting void wakeInvalidator() throws InterruptedException { + invalidatorHasRun = false; + // Wait through 2 cycles so we're sure our entry won't be picked as too new. + Thread.sleep(2 * runInvalidatorEvery); + invalidator.interrupt(); + while (!invalidatorHasRun) { + Thread.sleep(10); + } + } + + static class StatsCacheKey { + final byte[] hashed; + String dbName; + String tableName; + List<String> partNames; + String colName; + private MessageDigest md; + + StatsCacheKey(byte[] key) { + hashed = key; + } + + StatsCacheKey(String dbName, String tableName, List<String> partNames, String colName) { + this.dbName = dbName; + this.tableName = tableName; + this.partNames = partNames; + this.colName = colName; + + try { + md = MessageDigest.getInstance("MD5"); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + md.update(dbName.getBytes(HBaseUtils.ENCODING)); + md.update(tableName.getBytes(HBaseUtils.ENCODING)); + Collections.sort(this.partNames); + for (String s : partNames) { + md.update(s.getBytes(HBaseUtils.ENCODING)); + } + md.update(colName.getBytes(HBaseUtils.ENCODING)); + hashed = md.digest(); + } + + @Override + public boolean equals(Object other) { + if (other == null || !(other instanceof StatsCacheKey)) return false; + StatsCacheKey that = (StatsCacheKey)other; + return Arrays.equals(hashed, that.hashed); + } + + @Override + public int hashCode() { + return Arrays.hashCode(hashed); + } + } + + private class Invalidator extends Thread { + private List<HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry> entries = new ArrayList<>(); + private Lock lock = new ReentrantLock(); + + void addToQueue(HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry entry) { + lock.lock(); + try { + entries.add(entry); + } finally { + lock.unlock(); + } + } + + @Override + public void run() { + while (true) { + long startedAt = System.currentTimeMillis(); + List<HbaseMetastoreProto.AggrStatsInvalidatorFilter.Entry> thisRun = null; + lock.lock(); + try { + if (entries.size() > 0) { + thisRun = entries; + entries = new ArrayList<>(); + } + } finally { + lock.unlock(); + } + + if (thisRun != null) { + try { + HbaseMetastoreProto.AggrStatsInvalidatorFilter filter = + HbaseMetastoreProto.AggrStatsInvalidatorFilter.newBuilder() + .setRunEvery(runInvalidatorEvery) + .setMaxCacheEntryLife(maxTimeInCache) + .addAllToInvalidate(thisRun) + .build(); + List<StatsCacheKey> keys = + HBaseReadWrite.getInstance().invalidateAggregatedStats(filter); + cache.invalidateAll(keys); + } catch (IOException e) { + // Not a lot I can do here + LOG.error("Caught error while invalidating entries in the cache", e); + } + } + invalidatorHasRun = true; + + try { + sleep(runInvalidatorEvery - (System.currentTimeMillis() - startedAt)); + } catch (InterruptedException e) { + LOG.warn("Interupted while sleeping", e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java index 3fa0614..ebecfe3 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/ColumnStatsAggregatorFactory.java @@ -19,7 +19,15 @@ package org.apache.hadoop.hive.metastore.hbase.stats; +import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; +import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; +import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; +import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; public class ColumnStatsAggregatorFactory { @@ -41,8 +49,51 @@ public class ColumnStatsAggregatorFactory { case DECIMAL_STATS: return new DecimalColumnStatsAggregator(); default: + throw new RuntimeException("Woh, bad. Unknown stats type " + type.toString()); + } + } + + public static ColumnStatisticsObj newColumnStaticsObj(String colName, _Fields type) { + ColumnStatisticsObj cso = new ColumnStatisticsObj(); + ColumnStatisticsData csd = new ColumnStatisticsData(); + cso.setColName(colName); + switch (type) { + case BOOLEAN_STATS: + csd.setBooleanStats(new BooleanColumnStatsData()); + cso.setColType("boolean"); + break; + + case LONG_STATS: + csd.setLongStats(new LongColumnStatsData()); + cso.setColType("long"); + break; + + case DOUBLE_STATS: + csd.setDoubleStats(new DoubleColumnStatsData()); + cso.setColType("double"); + break; + + case STRING_STATS: + csd.setStringStats(new StringColumnStatsData()); + cso.setColType("string"); + break; + + case BINARY_STATS: + csd.setBinaryStats(new BinaryColumnStatsData()); + cso.setColType("binary"); + break; + + case DECIMAL_STATS: + csd.setDecimalStats(new DecimalColumnStatsData()); + cso.setColType("decimal"); + break; + + default: throw new RuntimeException("Woh, bad. Unknown stats type!"); } + + cso.setStatsData(csd); + return cso; } } http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto ---------------------------------------------------------------------- diff --git a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto index 0aa0d21..3cd8867 100644 --- a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto +++ b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto @@ -22,6 +22,35 @@ enum PrincipalType { ROLE = 1; } +message AggrStats { + required int64 parts_found = 1; + repeated ColumnStats col_stats = 2; +} + +message AggrStatsBloomFilter { + message BloomFilter { + required int32 num_bits = 1; + required int32 num_funcs = 2; + repeated int64 bits = 3; + } + required bytes db_name = 1; + required bytes table_name = 2; + required BloomFilter bloom_filter = 3; + required int64 aggregated_at = 4; +} + +message AggrStatsInvalidatorFilter { + message Entry { + required bytes db_name = 1; + required bytes table_name = 2; + required bytes part_name = 3; + } + + repeated Entry to_invalidate = 1; + required int64 run_every = 2; + required int64 max_cache_entry_life = 3; +} + message ColumnStats { message BooleanStats { @@ -63,6 +92,7 @@ message ColumnStats { optional StringStats string_stats = 8; optional StringStats binary_stats = 9; optional DecimalStats decimal_stats = 10; + optional string column_name = 11; } message Database { http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCache.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCache.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCache.java new file mode 100644 index 0000000..af8f5fc --- /dev/null +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseAggregateStatsCache.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.SortedMap; +import java.util.TreeMap; + +public class TestHBaseAggregateStatsCache { + private static final Log LOG = LogFactory.getLog(TestHBaseAggregateStatsCache.class.getName()); + + @Mock HTableInterface htable; + private HBaseStore store; + SortedMap<String, Cell> rows = new TreeMap<>(); + + @Before + public void before() throws IOException { + MockitoAnnotations.initMocks(this); + HiveConf conf = new HiveConf(); + conf.setBoolean(HBaseReadWrite.NO_CACHE_CONF, true); + store = MockUtils.init(conf, htable, rows); + store.backdoor().getStatsCache().resetCounters(); + } + + private static interface Checker { + void checkStats(AggrStats aggrStats) throws Exception; + } + + // Do to limitations in the Mock infrastructure we use for HBase testing we can only test + // this for a single column table and we can't really test hits in hbase, only in memory or + // build from scratch. But it's still useful to cover many bugs. More in depth testing with + // multiple columns and with HBase hits is done in TestHBaseAggrStatsCacheIntegration. + + @Test + public void allWithStats() throws Exception { + String dbName = "default"; + String tableName = "hit"; + List<String> partVals1 = Arrays.asList("today"); + List<String> partVals2 = Arrays.asList("yesterday"); + long now = System.currentTimeMillis(); + + List<FieldSchema> cols = new ArrayList<>(); + cols.add(new FieldSchema("col1", "boolean", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections.<String, String>emptyMap()); + List<FieldSchema> partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int)now, (int)now, 0, sd, partCols, + Collections.<String, String>emptyMap(), null, null, null); + store.createTable(table); + + for (List<String> partVals : Arrays.asList(partVals1, partVals2)) { + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVals.get(0)); + Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd, + Collections.<String, String>emptyMap()); + store.addPartition(part); + + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVals.get(0)); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col1"); + obj.setColType("boolean"); + ColumnStatisticsData data = new ColumnStatisticsData(); + BooleanColumnStatsData bcsd = new BooleanColumnStatsData(); + bcsd.setNumFalses(10); + bcsd.setNumTrues(20); + bcsd.setNumNulls(30); + data.setBooleanStats(bcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + + store.updatePartitionColumnStatistics(cs, partVals); + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(2, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col1", cso.getColName()); + Assert.assertEquals("boolean", cso.getColType()); + BooleanColumnStatsData bcsd = cso.getStatsData().getBooleanStats(); + Assert.assertEquals(20, bcsd.getNumFalses()); + Assert.assertEquals(40, bcsd.getNumTrues()); + Assert.assertEquals(60, bcsd.getNumNulls()); + } + }; + + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + // Check that we had to build it from the stats + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt()); + + // Call again, this time it should come from memory. Also, reverse the name order this time + // to assure that we still hit. + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt()); + } + + + @Test + public void noneWithStats() throws Exception { + String dbName = "default"; + String tableName = "nws"; + List<String> partVals1 = Arrays.asList("today"); + List<String> partVals2 = Arrays.asList("yesterday"); + long now = System.currentTimeMillis(); + + List<FieldSchema> cols = new ArrayList<>(); + cols.add(new FieldSchema("col1", "boolean", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections.<String, String>emptyMap()); + List<FieldSchema> partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int)now, (int)now, 0, sd, partCols, + Collections.<String, String>emptyMap(), null, null, null); + store.createTable(table); + + for (List<String> partVals : Arrays.asList(partVals1, partVals2)) { + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/nws/ds=" + partVals.get(0)); + Partition part = new Partition(partVals, dbName, tableName, (int) now, (int) now, psd, + Collections.<String, String>emptyMap()); + store.addPartition(part); + } + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(0, aggrStats.getPartsFound()); + } + }; + + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + } + + @Test + public void someNonexistentPartitions() throws Exception { + String dbName = "default"; + String tableName = "snp"; + List<String> partVals1 = Arrays.asList("today"); + List<String> partVals2 = Arrays.asList("yesterday"); + long now = System.currentTimeMillis(); + + List<FieldSchema> cols = new ArrayList<>(); + cols.add(new FieldSchema("col1", "boolean", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections.<String, String>emptyMap()); + List<FieldSchema> partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int)now, (int)now, 0, sd, partCols, + Collections.<String, String>emptyMap(), null, null, null); + store.createTable(table); + + StorageDescriptor psd = new StorageDescriptor(sd); + psd.setLocation("file:/tmp/default/hit/ds=" + partVals1.get(0)); + Partition part = new Partition(partVals1, dbName, tableName, (int) now, (int) now, psd, + Collections.<String, String>emptyMap()); + store.addPartition(part); + + ColumnStatistics cs = new ColumnStatistics(); + ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, dbName, tableName); + desc.setLastAnalyzed(now); + desc.setPartName("ds=" + partVals1.get(0)); + cs.setStatsDesc(desc); + ColumnStatisticsObj obj = new ColumnStatisticsObj(); + obj.setColName("col1"); + obj.setColType("double"); + ColumnStatisticsData data = new ColumnStatisticsData(); + DoubleColumnStatsData dcsd = new DoubleColumnStatsData(); + dcsd.setHighValue(1000.2342343); + dcsd.setLowValue(-20.1234213423); + dcsd.setNumNulls(30); + dcsd.setNumDVs(12342); + data.setDoubleStats(dcsd); + obj.setStatsData(data); + cs.addToStatsObj(obj); + + store.updatePartitionColumnStatistics(cs, partVals1); + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(1, aggrStats.getPartsFound()); + Assert.assertEquals(1, aggrStats.getColStatsSize()); + ColumnStatisticsObj cso = aggrStats.getColStats().get(0); + Assert.assertEquals("col1", cso.getColName()); + Assert.assertEquals("double", cso.getColType()); + DoubleColumnStatsData dcsd = cso.getStatsData().getDoubleStats(); + Assert.assertEquals(1000.23, dcsd.getHighValue(), 0.01); + Assert.assertEquals(-20.12, dcsd.getLowValue(), 0.01); + Assert.assertEquals(30, dcsd.getNumNulls()); + Assert.assertEquals(12342, dcsd.getNumDVs()); + } + }; + + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + // Check that we had to build it from the stats + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt()); + + // Call again, this time it should come from memory. Also, reverse the name order this time + // to assure that we still hit. + aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=yesterday", "ds=today"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(2, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt()); + } + + @Test + public void nonexistentPartitions() throws Exception { + String dbName = "default"; + String tableName = "nep"; + List<String> partVals1 = Arrays.asList("today"); + List<String> partVals2 = Arrays.asList("yesterday"); + long now = System.currentTimeMillis(); + + List<FieldSchema> cols = new ArrayList<>(); + cols.add(new FieldSchema("col1", "boolean", "nocomment")); + SerDeInfo serde = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, + serde, null, null, Collections.<String, String>emptyMap()); + List<FieldSchema> partCols = new ArrayList<>(); + partCols.add(new FieldSchema("ds", "string", "")); + Table table = new Table(tableName, dbName, "me", (int)now, (int)now, 0, sd, partCols, + Collections.<String, String>emptyMap(), null, null, null); + store.createTable(table); + + Checker statChecker = new Checker() { + @Override + public void checkStats(AggrStats aggrStats) throws Exception { + Assert.assertEquals(0, aggrStats.getPartsFound()); + } + }; + + AggrStats aggrStats = store.get_aggr_stats_for(dbName, tableName, + Arrays.asList("ds=today", "ds=yesterday"), Arrays.asList("col1")); + statChecker.checkStats(aggrStats); + + // Check that we had to build it from the stats + Assert.assertEquals(0, store.backdoor().getStatsCache().hbaseHits.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().totalGets.getCnt()); + Assert.assertEquals(1, store.backdoor().getStatsCache().misses.getCnt()); + } + // TODO test invalidation +} http://git-wip-us.apache.org/repos/asf/hive/blob/c53c6f45/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java index 92c9ba4..9878499 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseStore.java @@ -124,7 +124,7 @@ public class TestHBaseStore { @Rule public ExpectedException thrown = ExpectedException.none(); @Mock HTableInterface htable; - SortedMap<String, Cell> rows = new TreeMap<String, Cell>(); + SortedMap<String, Cell> rows = new TreeMap<>(); HBaseStore store;