Repository: hive Updated Branches: refs/heads/master 952fe6e17 -> d85beaa99
http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index 7beee42..6b6355b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -21,14 +21,18 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.TreeMap; -import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.metastore.StatObjectConverter; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; @@ -38,17 +42,26 @@ import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapp import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; import org.apache.hadoop.hive.metastore.hbase.HBaseUtils; import org.apache.hive.common.util.HiveStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; public class SharedCache { private static Map<String, Database> databaseCache = new TreeMap<String, Database>(); private static Map<String, TableWrapper> tableCache = new TreeMap<String, TableWrapper>(); - private static Map<String, PartitionWrapper> partitionCache = new TreeMap<String, PartitionWrapper>(); - private static Map<String, ColumnStatisticsObj> partitionColStatsCache = new TreeMap<String, ColumnStatisticsObj>(); - private static Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = new HashMap<ByteArrayWrapper, StorageDescriptorWrapper>(); + private static Map<String, PartitionWrapper> partitionCache = + new TreeMap<String, PartitionWrapper>(); + private static Map<String, ColumnStatisticsObj> partitionColStatsCache = + new TreeMap<String, ColumnStatisticsObj>(); + private static Map<String, ColumnStatisticsObj> tableColStatsCache = + new TreeMap<String, ColumnStatisticsObj>(); + private static Map<ByteArrayWrapper, StorageDescriptorWrapper> sdCache = + new HashMap<ByteArrayWrapper, StorageDescriptorWrapper>(); private static MessageDigest md; + static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName()); + static { try { md = MessageDigest.getInstance("MD5"); @@ -97,11 +110,13 @@ public class SharedCache { Table tblCopy = tbl.deepCopy(); tblCopy.setDbName(HiveStringUtils.normalizeIdentifier(dbName)); tblCopy.setTableName(HiveStringUtils.normalizeIdentifier(tblName)); - for (FieldSchema fs : tblCopy.getPartitionKeys()) { - fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName())); + if (tblCopy.getPartitionKeys() != null) { + for (FieldSchema fs : tblCopy.getPartitionKeys()) { + fs.setName(HiveStringUtils.normalizeIdentifier(fs.getName())); + } } TableWrapper wrapper; - if (tbl.getSd()!=null) { + if (tbl.getSd() != null) { byte[] sdHash = HBaseUtils.hashStorageDescriptor(tbl.getSd(), md); StorageDescriptor sd = tbl.getSd(); increSd(sd, sdHash); @@ -121,10 +136,54 @@ public class SharedCache { } } + public static synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) { + return tableColStatsCache.get(colStatsCacheKey); + } + + public static synchronized void removeTableColStatsFromCache(String dbName, String tblName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator<Entry<String, ColumnStatisticsObj>> iterator = + tableColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<String, ColumnStatisticsObj> entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + + public static synchronized void removeTableColStatsFromCache(String dbName, String tblName, + String colName) { + tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName)); + } + + public static synchronized void updateTableColStatsInCache(String dbName, String tableName, + List<ColumnStatisticsObj> colStatsForTable) { + for (ColumnStatisticsObj colStatObj : colStatsForTable) { + // Get old stats object if present + String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName()); + ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key); + if (oldStatsObj != null) { + LOG.debug("CachedStore: updating table column stats for column: " + colStatObj.getColName() + + ", of table: " + tableName + " and database: " + dbName); + // Update existing stat object's field + StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); + } else { + // No stats exist for this key; add a new object to the cache + tableColStatsCache.put(key, colStatObj); + } + } + } + public static synchronized void alterTableInCache(String dbName, String tblName, Table newTable) { removeTableFromCache(dbName, tblName); addTableToCache(HiveStringUtils.normalizeIdentifier(newTable.getDbName()), HiveStringUtils.normalizeIdentifier(newTable.getTableName()), newTable); + } + + public static synchronized void alterTableInPartitionCache(String dbName, String tblName, + Table newTable) { if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { List<Partition> partitions = listCachedPartitions(dbName, tblName, -1); for (Partition part : partitions) { @@ -137,6 +196,58 @@ public class SharedCache { } } + public static synchronized void alterTableInTableColStatsCache(String dbName, String tblName, + Table newTable) { + if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { + String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator<Entry<String, ColumnStatisticsObj>> iterator = + tableColStatsCache.entrySet().iterator(); + Map<String, ColumnStatisticsObj> newTableColStats = + new HashMap<String, ColumnStatisticsObj>(); + while (iterator.hasNext()) { + Entry<String, ColumnStatisticsObj> entry = iterator.next(); + String key = entry.getKey(); + ColumnStatisticsObj colStatObj = entry.getValue(); + if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) { + String[] decomposedKey = CacheUtils.splitTableColStats(key); + String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]); + newTableColStats.put(newKey, colStatObj); + iterator.remove(); + } + } + tableColStatsCache.putAll(newTableColStats); + } + } + + public static synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName, + Table newTable) { + if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { + List<Partition> partitions = listCachedPartitions(dbName, tblName, -1); + Map<String, ColumnStatisticsObj> newPartitionColStats = + new HashMap<String, ColumnStatisticsObj>(); + for (Partition part : partitions) { + String oldPartialPartitionKey = + CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues()); + Iterator<Entry<String, ColumnStatisticsObj>> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<String, ColumnStatisticsObj> entry = iterator.next(); + String key = entry.getKey(); + ColumnStatisticsObj colStatObj = entry.getValue(); + if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { + Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); + String newKey = + CacheUtils.buildKey((String) decomposedKey[0], (String) decomposedKey[1], + (List<String>) decomposedKey[2], (String) decomposedKey[3]); + newPartitionColStats.put(newKey, colStatObj); + iterator.remove(); + } + } + } + partitionColStatsCache.putAll(newPartitionColStats); + } + } + public static synchronized int getCachedTableCount() { return tableCache.size(); } @@ -151,18 +262,6 @@ public class SharedCache { return tables; } - public static synchronized void updateTableColumnStatistics(String dbName, String tableName, - List<ColumnStatisticsObj> statsObjs) { - Table tbl = getTableFromCache(dbName, tableName); - tbl.getSd().getParameters(); - List<String> colNames = new ArrayList<>(); - for (ColumnStatisticsObj statsObj:statsObjs) { - colNames.add(statsObj.getColName()); - } - StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames); - alterTableInCache(dbName, tableName, tbl); - } - public static synchronized List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) { List<TableMeta> tableMetas = new ArrayList<TableMeta>(); for (String dbName : listCachedDatabases()) { @@ -214,14 +313,51 @@ public class SharedCache { return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals)); } - public static synchronized Partition removePartitionFromCache(String dbName, String tblName, List<String> part_vals) { - PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); - if (wrapper.getSdHash()!=null) { + public static synchronized Partition removePartitionFromCache(String dbName, String tblName, + List<String> part_vals) { + PartitionWrapper wrapper = + partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); + if (wrapper.getSdHash() != null) { decrSd(wrapper.getSdHash()); } return wrapper.getPartition(); } + // Remove cached column stats for all partitions of a table + public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator<Entry<String, ColumnStatisticsObj>> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<String, ColumnStatisticsObj> entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + + // Remove cached column stats for a particular partition of a table + public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName, + List<String> partVals) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); + Iterator<Entry<String, ColumnStatisticsObj>> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<String, ColumnStatisticsObj> entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + + // Remove cached column stats for a particular partition and a particular column of a table + public static synchronized void removePartitionColStatsFromCache(String dbName, String tblName, + List<String> partVals, String colName) { + partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName)); + } + public static synchronized List<Partition> listCachedPartitions(String dbName, String tblName, int max) { List<Partition> partitions = new ArrayList<Partition>(); int count = 0; @@ -236,22 +372,53 @@ public class SharedCache { return partitions; } - public static synchronized void alterPartitionInCache(String dbName, String tblName, List<String> partVals, Partition newPart) { + public static synchronized void alterPartitionInCache(String dbName, String tblName, + List<String> partVals, Partition newPart) { removePartitionFromCache(dbName, tblName, partVals); addPartitionToCache(HiveStringUtils.normalizeIdentifier(newPart.getDbName()), HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart); } - public static synchronized void updatePartitionColumnStatistics(String dbName, String tableName, - List<String> partVals, List<ColumnStatisticsObj> statsObjs) { - Partition part = getPartitionFromCache(dbName, tableName, partVals); - part.getSd().getParameters(); - List<String> colNames = new ArrayList<>(); - for (ColumnStatisticsObj statsObj:statsObjs) { - colNames.add(statsObj.getColName()); + public static synchronized void alterPartitionInColStatsCache(String dbName, String tblName, + List<String> partVals, Partition newPart) { + String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); + Map<String, ColumnStatisticsObj> newPartitionColStats = + new HashMap<String, ColumnStatisticsObj>(); + Iterator<Entry<String, ColumnStatisticsObj>> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<String, ColumnStatisticsObj> entry = iterator.next(); + String key = entry.getKey(); + ColumnStatisticsObj colStatObj = entry.getValue(); + if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { + Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); + String newKey = + CacheUtils.buildKey(HiveStringUtils.normalizeIdentifier(newPart.getDbName()), + HiveStringUtils.normalizeIdentifier(newPart.getTableName()), newPart.getValues(), + (String) decomposedKey[3]); + newPartitionColStats.put(newKey, colStatObj); + iterator.remove(); + } + } + partitionColStatsCache.putAll(newPartitionColStats); + } + + public static synchronized void updatePartitionColStatsInCache(String dbName, String tableName, + List<String> partVals, List<ColumnStatisticsObj> colStatsObjs) { + for (ColumnStatisticsObj colStatObj : colStatsObjs) { + // Get old stats object if present + String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName()); + ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key); + if (oldStatsObj != null) { + // Update existing stat object's field + LOG.debug("CachedStore: updating partition column stats for column: " + + colStatObj.getColName() + ", of table: " + tableName + " and database: " + dbName); + StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); + } else { + // No stats exist for this key; add a new object to the cache + partitionColStatsCache.put(key, colStatObj); + } } - StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); - alterPartitionInCache(dbName, tableName, partVals, part); } public static synchronized int getCachedPartitionCount() { @@ -262,10 +429,47 @@ public class SharedCache { return partitionColStatsCache.get(key); } - public static synchronized void addPartitionColStatsToCache(Map<String, ColumnStatisticsObj> aggrStatsPerPartition) { - partitionColStatsCache.putAll(aggrStatsPerPartition); + public static synchronized void addPartitionColStatsToCache(String dbName, String tableName, + Map<String, List<ColumnStatisticsObj>> colStatsPerPartition) { + for (Map.Entry<String, List<ColumnStatisticsObj>> entry : colStatsPerPartition.entrySet()) { + String partName = entry.getKey(); + try { + List<String> partVals = Warehouse.getPartValuesFromPartName(partName); + for (ColumnStatisticsObj colStatObj : entry.getValue()) { + String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName()); + partitionColStatsCache.put(key, colStatObj); + } + } catch (MetaException e) { + LOG.info("Unable to add partition: " + partName + " to SharedCache", e); + } + } } + public static synchronized void refreshPartitionColStats(String dbName, String tableName, + Map<String, List<ColumnStatisticsObj>> newColStatsPerPartition) { + LOG.debug("CachedStore: updating cached partition column stats objects for database: " + dbName + + " and table: " + tableName); + removePartitionColStatsFromCache(dbName, tableName); + addPartitionColStatsToCache(dbName, tableName, newColStatsPerPartition); + } + + public static synchronized void addTableColStatsToCache(String dbName, String tableName, + List<ColumnStatisticsObj> colStatsForTable) { + for (ColumnStatisticsObj colStatObj : colStatsForTable) { + String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName()); + tableColStatsCache.put(key, colStatObj); + } + } + + public static synchronized void refreshTableColStats(String dbName, String tableName, + List<ColumnStatisticsObj> colStatsForTable) { + LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName + + " and table: " + tableName); + // Remove all old cache entries for this table + removeTableColStatsFromCache(dbName, tableName); + // Add new entries to cache + addTableColStatsToCache(dbName, tableName, colStatsForTable); + } public static void increSd(StorageDescriptor sd, byte[] sdHash) { ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); @@ -295,6 +499,7 @@ public class SharedCache { // Replace databases in databaseCache with the new list public static synchronized void refreshDatabases(List<Database> databases) { + LOG.debug("CachedStore: updating cached database objects"); for (String dbName : listCachedDatabases()) { removeDatabaseFromCache(dbName); } @@ -305,6 +510,7 @@ public class SharedCache { // Replace tables in tableCache with the new list public static synchronized void refreshTables(String dbName, List<Table> tables) { + LOG.debug("CachedStore: updating cached table objects for database: " + dbName); for (Table tbl : listCachedTables(dbName)) { removeTableFromCache(dbName, tbl.getTableName()); } @@ -313,17 +519,18 @@ public class SharedCache { } } - public static void refreshPartitions(String dbName, String tblName, List<Partition> partitions) { - List<String> keysToRemove = new ArrayList<String>(); - for (Map.Entry<String, PartitionWrapper> entry : partitionCache.entrySet()) { - if (entry.getValue().getPartition().getDbName().equals(dbName) - && entry.getValue().getPartition().getTableName().equals(tblName)) { - keysToRemove.add(entry.getKey()); + public static synchronized void refreshPartitions(String dbName, String tblName, + List<Partition> partitions) { + LOG.debug("CachedStore: updating cached partition objects for database: " + dbName + + " and table: " + tblName); + Iterator<Entry<String, PartitionWrapper>> iterator = partitionCache.entrySet().iterator(); + while (iterator.hasNext()) { + PartitionWrapper partitionWrapper = iterator.next().getValue(); + if (partitionWrapper.getPartition().getDbName().equals(dbName) + && partitionWrapper.getPartition().getTableName().equals(tblName)) { + iterator.remove(); } } - for (String key : keysToRemove) { - partitionCache.remove(key); - } for (Partition part : partitions) { addPartitionToCache(dbName, tblName, part); } http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 0c7d8bb..a7681dd 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -2854,7 +2854,7 @@ public class HBaseStore implements RawStore { } @Override - public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName, + public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName, String tableName) throws MetaException, NoSuchObjectException { // TODO: see if it makes sense to implement this here return null; http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java index da6cd46..fe890e4 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/ColumnStatsMergerFactory.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData._Fields; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.DateColumnStatsData; import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; @@ -35,7 +36,7 @@ public class ColumnStatsMergerFactory { private ColumnStatsMergerFactory() { } - + // we depend on the toString() method for javolution.util.FastCollection. private static int countNumBitVectors(String s) { if (s != null) { @@ -88,8 +89,15 @@ public class ColumnStatsMergerFactory { numBitVectors = nbvNew == nbvOld ? nbvNew : 0; break; } + case DATE_STATS: { + agg = new DateColumnStatsMerger(); + int nbvNew = countNumBitVectors(statsObjNew.getStatsData().getDateStats().getBitVectors()); + int nbvOld = countNumBitVectors(statsObjOld.getStatsData().getDateStats().getBitVectors()); + numBitVectors = nbvNew == nbvOld ? nbvNew : 0; + break; + } default: - throw new RuntimeException("Woh, bad. Unknown stats type " + typeNew.toString()); + throw new IllegalArgumentException("Unknown stats type " + typeNew.toString()); } if (numBitVectors > 0) { agg.ndvEstimator = new NumDistinctValueEstimator(numBitVectors); @@ -127,8 +135,12 @@ public class ColumnStatsMergerFactory { csd.setDecimalStats(new DecimalColumnStatsData()); break; + case DATE_STATS: + csd.setDateStats(new DateColumnStatsData()); + break; + default: - throw new RuntimeException("Woh, bad. Unknown stats type!"); + throw new IllegalArgumentException("Unknown stats type"); } cso.setStatsData(csd); http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java new file mode 100644 index 0000000..3179b23 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/stats/merge/DateColumnStatsMerger.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.hbase.stats.merge; + +import org.apache.hadoop.hive.metastore.NumDistinctValueEstimator; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.Date; +import org.apache.hadoop.hive.metastore.api.DateColumnStatsData; + +public class DateColumnStatsMerger extends ColumnStatsMerger { + @Override + public void merge(ColumnStatisticsObj aggregateColStats, ColumnStatisticsObj newColStats) { + DateColumnStatsData aggregateData = aggregateColStats.getStatsData().getDateStats(); + DateColumnStatsData newData = newColStats.getStatsData().getDateStats(); + Date lowValue = + aggregateData.getLowValue().compareTo(newData.getLowValue()) < 0 ? aggregateData + .getLowValue() : newData.getLowValue(); + aggregateData.setLowValue(lowValue); + Date highValue = + aggregateData.getHighValue().compareTo(newData.getHighValue()) >= 0 ? aggregateData + .getHighValue() : newData.getHighValue(); + aggregateData.setHighValue(highValue); + aggregateData.setNumNulls(aggregateData.getNumNulls() + newData.getNumNulls()); + if (ndvEstimator == null || !newData.isSetBitVectors() || newData.getBitVectors().length() == 0) { + aggregateData.setNumDVs(Math.max(aggregateData.getNumDVs(), newData.getNumDVs())); + } else { + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(aggregateData.getBitVectors(), + ndvEstimator.getnumBitVectors())); + ndvEstimator.mergeEstimators(new NumDistinctValueEstimator(newData.getBitVectors(), + ndvEstimator.getnumBitVectors())); + long ndv = ndvEstimator.estimateNumDistinctValues(); + LOG.debug("Use bitvector to merge column " + aggregateColStats.getColName() + "'s ndvs of " + + aggregateData.getNumDVs() + " and " + newData.getNumDVs() + " to be " + ndv); + aggregateData.setNumDVs(ndv); + aggregateData.setBitVectors(ndvEstimator.serialize().toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index f613c30..f53944f 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -872,7 +872,7 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable { } @Override - public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName, + public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName, String tableName) throws MetaException, NoSuchObjectException { // TODO Auto-generated method stub return null; http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index 1720e37..e0f5cdb 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -888,7 +888,7 @@ public class DummyRawStoreForJdoConnection implements RawStore { } @Override - public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName, + public Map<String, List<ColumnStatisticsObj>> getColStatsForTablePartitions(String dbName, String tableName) throws MetaException, NoSuchObjectException { // TODO Auto-generated method stub return null; http://git-wip-us.apache.org/repos/asf/hive/blob/d85beaa9/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java index 0ab20d6..7a3ec09 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java @@ -25,11 +25,22 @@ import java.util.Map; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.ObjectStore; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.TestObjectStore.MockPartitionExpressionProxy; +import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.PrincipalType; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; import org.junit.Assert; import org.junit.Before; @@ -37,18 +48,24 @@ import org.junit.Test; public class TestCachedStore { - private CachedStore cachedStore = new CachedStore(); + private ObjectStore objectStore; + private CachedStore cachedStore; @Before public void setUp() throws Exception { HiveConf conf = new HiveConf(); - conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS, MockPartitionExpressionProxy.class.getName()); - - ObjectStore objectStore = new ObjectStore(); + conf.setBoolean(HiveConf.ConfVars.HIVE_IN_TEST.varname, true); + conf.setVar(HiveConf.ConfVars.METASTORE_EXPRESSION_PROXY_CLASS, + MockPartitionExpressionProxy.class.getName()); + objectStore = new ObjectStore(); objectStore.setConf(conf); + cachedStore = new CachedStore(); + cachedStore.setConf(conf); + // Stop the CachedStore cache update service. We'll start it explicitly to control the test + cachedStore.stopCacheUpdateService(1); - cachedStore.setRawStore(objectStore); - + // Stop the CachedStore cache update service. We'll start it explicitly to control the test + cachedStore.stopCacheUpdateService(1); SharedCache.getDatabaseCache().clear(); SharedCache.getTableCache().clear(); SharedCache.getPartitionCache().clear(); @@ -56,6 +73,426 @@ public class TestCachedStore { SharedCache.getPartitionColStatsCache().clear(); } + /********************************************************************************************** + * Methods that test CachedStore + *********************************************************************************************/ + + @Test + public void testDatabaseOps() throws Exception { + // Add a db via ObjectStore + String dbName = "testDatabaseOps"; + String dbDescription = "testDatabaseOps"; + String dbLocation = "file:/tmp"; + Map<String, String> dbParams = new HashMap<String, String>(); + String dbOwner = "user1"; + Database db = new Database(dbName, dbDescription, dbLocation, dbParams); + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + objectStore.createDatabase(db); + db = objectStore.getDatabase(dbName); + // Prewarm CachedStore + cachedStore.prewarm(); + + // Read database via CachedStore + Database dbNew = cachedStore.getDatabase(dbName); + Assert.assertEquals(db, dbNew); + + // Add another db via CachedStore + final String dbName1 = "testDatabaseOps1"; + final String dbDescription1 = "testDatabaseOps1"; + Database db1 = new Database(dbName1, dbDescription1, dbLocation, dbParams); + db1.setOwnerName(dbOwner); + db1.setOwnerType(PrincipalType.USER); + cachedStore.createDatabase(db1); + db1 = cachedStore.getDatabase(dbName1); + + // Read db via ObjectStore + dbNew = objectStore.getDatabase(dbName1); + Assert.assertEquals(db1, dbNew); + + // Alter the db via CachedStore (can only alter owner or parameters) + db = new Database(dbName, dbDescription, dbLocation, dbParams); + dbOwner = "user2"; + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + cachedStore.alterDatabase(dbName, db); + db = cachedStore.getDatabase(dbName); + + // Read db via ObjectStore + dbNew = objectStore.getDatabase(dbName); + Assert.assertEquals(db, dbNew); + + // Add another db via ObjectStore + final String dbName2 = "testDatabaseOps2"; + final String dbDescription2 = "testDatabaseOps2"; + Database db2 = new Database(dbName2, dbDescription2, dbLocation, dbParams); + db2.setOwnerName(dbOwner); + db2.setOwnerType(PrincipalType.USER); + objectStore.createDatabase(db2); + db2 = objectStore.getDatabase(dbName2); + + // Alter db "testDatabaseOps" via ObjectStore + dbOwner = "user1"; + db = new Database(dbName, dbDescription, dbLocation, dbParams); + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + objectStore.alterDatabase(dbName, db); + db = objectStore.getDatabase(dbName); + + // Drop db "testDatabaseOps1" via ObjectStore + objectStore.dropDatabase(dbName1); + + // We update twice to accurately detect if cache is dirty or not + updateCache(cachedStore, 100, 500, 100); + updateCache(cachedStore, 100, 500, 100); + + // Read the newly added db via CachedStore + dbNew = cachedStore.getDatabase(dbName2); + Assert.assertEquals(db2, dbNew); + + // Read the altered db via CachedStore (altered user from "user2" to "user1") + dbNew = cachedStore.getDatabase(dbName); + Assert.assertEquals(db, dbNew); + + // Try to read the dropped db after cache update + try { + dbNew = cachedStore.getDatabase(dbName1); + Assert.fail("The database: " + dbName1 + + " should have been removed from the cache after running the update service"); + } catch (NoSuchObjectException e) { + // Expected + } + + // Clean up + objectStore.dropDatabase(dbName); + objectStore.dropDatabase(dbName2); + } + + @Test + public void testTableOps() throws Exception { + // Add a db via ObjectStore + String dbName = "testTableOps"; + String dbDescription = "testTableOps"; + String dbLocation = "file:/tmp"; + Map<String, String> dbParams = new HashMap<String, String>(); + String dbOwner = "user1"; + Database db = new Database(dbName, dbDescription, dbLocation, dbParams); + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + objectStore.createDatabase(db); + db = objectStore.getDatabase(dbName); + + // Add a table via ObjectStore + String tblName = "tbl"; + String tblOwner = "user1"; + String serdeLocation = "file:/tmp"; + FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); + FieldSchema col2 = new FieldSchema("col2", "string", "string column"); + List<FieldSchema> cols = new ArrayList<FieldSchema>(); + cols.add(col1); + cols.add(col2); + Map<String, String> serdeParams = new HashMap<String, String>(); + Map<String, String> tblParams = new HashMap<String, String>(); + SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<String, String>()); + StorageDescriptor sd = + new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, + null, serdeParams); + sd.setStoredAsSubDirectories(false); + Table tbl = + new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams, + null, null, TableType.MANAGED_TABLE.toString()); + objectStore.createTable(tbl); + tbl = objectStore.getTable(dbName, tblName); + + // Prewarm CachedStore + cachedStore.prewarm(); + + // Read database, table via CachedStore + Database dbNew = cachedStore.getDatabase(dbName); + Assert.assertEquals(db, dbNew); + Table tblNew = cachedStore.getTable(dbName, tblName); + Assert.assertEquals(tbl, tblNew); + + // Add a new table via CachedStore + String tblName1 = "tbl1"; + Table tbl1 = + new Table(tblName1, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams, + null, null, TableType.MANAGED_TABLE.toString()); + cachedStore.createTable(tbl1); + tbl1 = cachedStore.getTable(dbName, tblName1); + + // Read via object store + tblNew = objectStore.getTable(dbName, tblName1); + Assert.assertEquals(tbl1, tblNew); + + // Add a new table via ObjectStore + String tblName2 = "tbl2"; + Table tbl2 = + new Table(tblName2, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams, + null, null, TableType.MANAGED_TABLE.toString()); + objectStore.createTable(tbl2); + tbl2 = objectStore.getTable(dbName, tblName2); + + // Alter table "tbl" via ObjectStore + tblOwner = "user2"; + tbl = + new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams, + null, null, TableType.MANAGED_TABLE.toString()); + objectStore.alterTable(dbName, tblName, tbl); + tbl = objectStore.getTable(dbName, tblName); + + // Drop table "tbl1" via ObjectStore + objectStore.dropTable(dbName, tblName1); + + // We update twice to accurately detect if cache is dirty or not + updateCache(cachedStore, 100, 500, 100); + updateCache(cachedStore, 100, 500, 100); + + // Read "tbl2" via CachedStore + tblNew = cachedStore.getTable(dbName, tblName2); + Assert.assertEquals(tbl2, tblNew); + + // Read the altered "tbl" via CachedStore + tblNew = cachedStore.getTable(dbName, tblName); + Assert.assertEquals(tbl, tblNew); + + // Try to read the dropped "tbl1" via CachedStore (should throw exception) + tblNew = cachedStore.getTable(dbName, tblName1); + Assert.assertNull(tblNew); + + // Should return "tbl" and "tbl2" + List<String> tblNames = cachedStore.getTables(dbName, "*"); + Assert.assertTrue(tblNames.contains(tblName)); + Assert.assertTrue(!tblNames.contains(tblName1)); + Assert.assertTrue(tblNames.contains(tblName2)); + + // Clean up + objectStore.dropTable(dbName, tblName); + objectStore.dropTable(dbName, tblName2); + objectStore.dropDatabase(dbName); + } + + @Test + public void testPartitionOps() throws Exception { + // Add a db via ObjectStore + String dbName = "testPartitionOps"; + String dbDescription = "testPartitionOps"; + String dbLocation = "file:/tmp"; + Map<String, String> dbParams = new HashMap<String, String>(); + String dbOwner = "user1"; + Database db = new Database(dbName, dbDescription, dbLocation, dbParams); + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + objectStore.createDatabase(db); + db = objectStore.getDatabase(dbName); + + // Add a table via ObjectStore + String tblName = "tbl"; + String tblOwner = "user1"; + String serdeLocation = "file:/tmp"; + FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); + FieldSchema col2 = new FieldSchema("col2", "string", "string column"); + List<FieldSchema> cols = new ArrayList<FieldSchema>(); + cols.add(col1); + cols.add(col2); + Map<String, String> serdeParams = new HashMap<String, String>(); + Map<String, String> tblParams = new HashMap<String, String>(); + SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = + new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, + null, serdeParams); + FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column"); + List<FieldSchema> ptnCols = new ArrayList<FieldSchema>(); + ptnCols.add(ptnCol1); + Table tbl = + new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null, + TableType.MANAGED_TABLE.toString()); + objectStore.createTable(tbl); + tbl = objectStore.getTable(dbName, tblName); + final String ptnColVal1 = "aaa"; + Map<String, String> partParams = new HashMap<String, String>(); + Partition ptn1 = + new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, sd, partParams); + objectStore.addPartition(ptn1); + ptn1 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1)); + final String ptnColVal2 = "bbb"; + Partition ptn2 = + new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, sd, partParams); + objectStore.addPartition(ptn2); + ptn2 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); + + // Prewarm CachedStore + cachedStore.prewarm(); + + // Read database, table, partition via CachedStore + Database dbNew = cachedStore.getDatabase(dbName); + Assert.assertEquals(db, dbNew); + Table tblNew = cachedStore.getTable(dbName, tblName); + Assert.assertEquals(tbl, tblNew); + Partition newPtn1 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1)); + Assert.assertEquals(ptn1, newPtn1); + Partition newPtn2 = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); + Assert.assertEquals(ptn2, newPtn2); + + // Add a new partition via ObjectStore + final String ptnColVal3 = "ccc"; + Partition ptn3 = + new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, sd, partParams); + objectStore.addPartition(ptn3); + ptn3 = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3)); + + // Alter an existing partition ("aaa") via ObjectStore + final String ptnColVal1Alt = "aaaAlt"; + Partition ptn1Atl = + new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, sd, partParams); + objectStore.alterPartition(dbName, tblName, Arrays.asList(ptnColVal1), ptn1Atl); + ptn1Atl = objectStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt)); + + // Drop an existing partition ("bbb") via ObjectStore + objectStore.dropPartition(dbName, tblName, Arrays.asList(ptnColVal2)); + + // We update twice to accurately detect if cache is dirty or not + updateCache(cachedStore, 100, 500, 100); + updateCache(cachedStore, 100, 500, 100); + + // Read the newly added partition via CachedStore + Partition newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal3)); + Assert.assertEquals(ptn3, newPtn); + + // Read the altered partition via CachedStore + newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal1Alt)); + Assert.assertEquals(ptn1Atl, newPtn); + + // Try to read the dropped partition via CachedStore + try { + newPtn = cachedStore.getPartition(dbName, tblName, Arrays.asList(ptnColVal2)); + Assert.fail("The partition: " + ptnColVal2 + + " should have been removed from the cache after running the update service"); + } catch (NoSuchObjectException e) { + // Expected + } + } + + //@Test + public void testTableColStatsOps() throws Exception { + // Add a db via ObjectStore + String dbName = "testTableColStatsOps"; + String dbDescription = "testTableColStatsOps"; + String dbLocation = "file:/tmp"; + Map<String, String> dbParams = new HashMap<String, String>(); + String dbOwner = "user1"; + Database db = new Database(dbName, dbDescription, dbLocation, dbParams); + db.setOwnerName(dbOwner); + db.setOwnerType(PrincipalType.USER); + objectStore.createDatabase(db); + db = objectStore.getDatabase(dbName); + + // Add a table via ObjectStore + final String tblName = "tbl"; + final String tblOwner = "user1"; + final String serdeLocation = "file:/tmp"; + final FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); + // Stats values for col1 + long col1LowVal = 5; + long col1HighVal = 500; + long col1Nulls = 10; + long col1DV = 20; + final FieldSchema col2 = new FieldSchema("col2", "string", "string column"); + // Stats values for col2 + long col2MaxColLen = 100; + double col2AvgColLen = 45.5; + long col2Nulls = 5; + long col2DV = 40; + final FieldSchema col3 = new FieldSchema("col3", "boolean", "boolean column"); + // Stats values for col3 + long col3NumTrues = 100; + long col3NumFalses = 30; + long col3Nulls = 10; + final List<FieldSchema> cols = new ArrayList<FieldSchema>(); + cols.add(col1); + cols.add(col2); + cols.add(col3); + Map<String, String> serdeParams = new HashMap<String, String>(); + Map<String, String> tblParams = new HashMap<String, String>(); + final SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", null); + StorageDescriptor sd = + new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, + null, serdeParams); + Table tbl = + new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, new ArrayList<FieldSchema>(), tblParams, + null, null, TableType.MANAGED_TABLE.toString()); + objectStore.createTable(tbl); + tbl = objectStore.getTable(dbName, tblName); + + // Add ColumnStatistics for tbl to metastore DB via ObjectStore + ColumnStatistics stats = new ColumnStatistics(); + ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName); + List<ColumnStatisticsObj> colStatObjs = new ArrayList<ColumnStatisticsObj>(); + + // Col1 + ColumnStatisticsData data1 = new ColumnStatisticsData(); + ColumnStatisticsObj col1Stats = new ColumnStatisticsObj(col1.getName(), col1.getType(), data1); + LongColumnStatsData longStats = new LongColumnStatsData(); + longStats.setLowValue(col1LowVal); + longStats.setHighValue(col1HighVal); + longStats.setNumNulls(col1Nulls); + longStats.setNumDVs(col1DV); + data1.setLongStats(longStats); + colStatObjs.add(col1Stats); + + // Col2 + ColumnStatisticsData data2 = new ColumnStatisticsData(); + ColumnStatisticsObj col2Stats = new ColumnStatisticsObj(col2.getName(), col2.getType(), data2); + StringColumnStatsData stringStats = new StringColumnStatsData(); + stringStats.setMaxColLen(col2MaxColLen); + stringStats.setAvgColLen(col2AvgColLen); + stringStats.setNumNulls(col2Nulls); + stringStats.setNumDVs(col2DV); + data2.setStringStats(stringStats); + colStatObjs.add(col2Stats); + + // Col3 + ColumnStatisticsData data3 = new ColumnStatisticsData(); + ColumnStatisticsObj col3Stats = new ColumnStatisticsObj(col3.getName(), col3.getType(), data3); + BooleanColumnStatsData boolStats = new BooleanColumnStatsData(); + boolStats.setNumTrues(col3NumTrues); + boolStats.setNumFalses(col3NumFalses); + boolStats.setNumNulls(col3Nulls); + data3.setBooleanStats(boolStats); + colStatObjs.add(col3Stats); + + stats.setStatsDesc(statsDesc); + stats.setStatsObj(colStatObjs); + + // Save to DB + objectStore.updateTableColumnStatistics(stats); + + // Prewarm CachedStore + cachedStore.prewarm(); + + // Read table stats via CachedStore + ColumnStatistics newStats = + cachedStore.getTableColumnStatistics(dbName, tblName, + Arrays.asList(col1.getName(), col2.getName(), col3.getName())); + Assert.assertEquals(stats, newStats); + } + + private void updateCache(CachedStore cachedStore, long frequency, long sleepTime, + long shutdownTimeout) throws InterruptedException { + // Set cache refresh period to 100 milliseconds + cachedStore.setCacheRefreshPeriod(100); + // Start the CachedStore update service + cachedStore.startCacheUpdateService(); + // Sleep for 500 ms so that cache update is complete + Thread.sleep(500); + // Stop cache update service + cachedStore.stopCacheUpdateService(100); + } + + /********************************************************************************************** + * Methods that test SharedCache + *********************************************************************************************/ + @Test public void testSharedStoreDb() { Database db1 = new Database(); @@ -160,6 +597,7 @@ public class TestCachedStore { Assert.assertEquals(SharedCache.getSdCache().size(), 2); } + @Test public void testSharedStorePartition() { Partition part1 = new Partition();