This is an automated email from the ASF dual-hosted git repository. daijy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new c5e6608 HIVE-20613: CachedStore: Add more UT coverage (outside of .q files) (Vaibhav Gumashta, reviewed by Daniel Dai) c5e6608 is described below commit c5e6608246754380dd0de20be72b850fabe60df8 Author: Daniel Dai <dai...@gmail.com> AuthorDate: Sun May 5 19:57:05 2019 -0700 HIVE-20613: CachedStore: Add more UT coverage (outside of .q files) (Vaibhav Gumashta, reviewed by Daniel Dai) --- .../hadoop/hive/metastore/cache/CachedStore.java | 205 +-- .../hadoop/hive/metastore/cache/SharedCache.java | 58 +- .../hive/metastore/cache/TestCachedStore.java | 1354 ++++++++++++++------ 3 files changed, 1095 insertions(+), 522 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 1fac51e..a5d0c04 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -108,13 +109,18 @@ public class CachedStore implements RawStore, Configurable { private static long DEFAULT_CACHE_REFRESH_PERIOD = 100; // Time after which metastore cache is updated from metastore DB by the background update thread private static long cacheRefreshPeriodMS = DEFAULT_CACHE_REFRESH_PERIOD; + private static int MAX_RETRIES = 10; + // This is set to true only after prewarm is complete private static AtomicBoolean isCachePrewarmed = new AtomicBoolean(false); + // This is set to true only if we were able to cache all the metadata. + // We may not be able to cache all metadata if we hit CACHED_RAW_STORE_MAX_CACHE_MEMORY limit. + private static AtomicBoolean isCachedAllMetadata = new AtomicBoolean(false); private static TablesPendingPrewarm tblsPendingPrewarm = new TablesPendingPrewarm(); private RawStore rawStore = null; private Configuration conf; private static boolean areTxnStatsSupported; private PartitionExpressionProxy expressionProxy = null; - private static final SharedCache sharedCache = new SharedCache(); + private static SharedCache sharedCache = new SharedCache(); private static boolean canUseEvents = false; private static long lastEventId; @@ -196,8 +202,8 @@ public class CachedStore implements RawStore, Configurable { MetastoreConf.getSizeVar(conf, ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY); sharedCache.initialize(maxSharedCacheSizeInBytes); if (maxSharedCacheSizeInBytes > 0) { - LOG.info("Maximum memory that the cache will use: {} GB", - maxSharedCacheSizeInBytes / (1024 * 1024 * 1024)); + LOG.info("Maximum memory that the cache will use: {} KB", + maxSharedCacheSizeInBytes / (1024)); } } @@ -412,8 +418,7 @@ public class CachedStore implements RawStore, Configurable { Collection<String> catalogsToCache; try { catalogsToCache = catalogsToCache(rawStore); - LOG.info("Going to cache catalogs: " - + org.apache.commons.lang.StringUtils.join(catalogsToCache, ", ")); + LOG.info("Going to cache catalogs: " + org.apache.commons.lang.StringUtils.join(catalogsToCache, ", ")); List<Catalog> catalogs = new ArrayList<>(catalogsToCache.size()); for (String catName : catalogsToCache) { catalogs.add(rawStore.getCatalog(catName)); @@ -441,8 +446,7 @@ public class CachedStore implements RawStore, Configurable { databases.add(rawStore.getDatabase(catName, dbName)); } catch (NoSuchObjectException e) { // Continue with next database - LOG.warn("Failed to cache database " - + DatabaseName.getQualified(catName, dbName) + ", moving on", e); + LOG.warn("Failed to cache database " + DatabaseName.getQualified(catName, dbName) + ", moving on", e); } } } catch (MetaException e) { @@ -450,8 +454,7 @@ public class CachedStore implements RawStore, Configurable { } } sharedCache.populateDatabasesInCache(databases); - LOG.info( - "Databases cache is now prewarmed. Now adding tables, partitions and statistics to the cache"); + LOG.info("Databases cache is now prewarmed. Now adding tables, partitions and statistics to the cache"); int numberOfDatabasesCachedSoFar = 0; for (Database db : databases) { String catName = StringUtils.normalizeIdentifier(db.getCatalogName()); @@ -460,8 +463,7 @@ public class CachedStore implements RawStore, Configurable { try { tblNames = rawStore.getAllTables(catName, dbName); } catch (MetaException e) { - LOG.warn("Failed to cache tables for database " - + DatabaseName.getQualified(catName, dbName) + ", moving on"); + LOG.warn("Failed to cache tables for database " + DatabaseName.getQualified(catName, dbName) + ", moving on"); // Continue with next database continue; } @@ -470,8 +472,7 @@ public class CachedStore implements RawStore, Configurable { int numberOfTablesCachedSoFar = 0; while (tblsPendingPrewarm.hasMoreTablesToPrewarm()) { try { - String tblName = - StringUtils.normalizeIdentifier(tblsPendingPrewarm.getNextTableNameToPrewarm()); + String tblName = StringUtils.normalizeIdentifier(tblsPendingPrewarm.getNextTableNameToPrewarm()); if (!shouldCacheTable(catName, dbName, tblName)) { continue; } @@ -479,6 +480,7 @@ public class CachedStore implements RawStore, Configurable { try { table = rawStore.getTable(catName, dbName, tblName); } catch (MetaException e) { + LOG.debug(ExceptionUtils.getStackTrace(e)); // It is possible the table is deleted during fetching tables of the database, // in that case, continue with the next table continue; @@ -501,14 +503,13 @@ public class CachedStore implements RawStore, Configurable { if (!partNames.isEmpty()) { // Get partition column stats for this table Deadline.startTimer("getPartitionColumnStatistics"); - partitionColStats = rawStore.getPartitionColumnStatistics(catName, dbName, - tblName, partNames, colNames); + partitionColStats = + rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames); Deadline.stopTimer(); // Get aggregate stats for all partitions of a table and for all but default // partition Deadline.startTimer("getAggrPartitionColumnStatistics"); - aggrStatsAllPartitions = - rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); + aggrStatsAllPartitions = rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); Deadline.stopTimer(); // Remove default partition from partition names and get aggregate // stats again @@ -530,44 +531,48 @@ public class CachedStore implements RawStore, Configurable { } } else { Deadline.startTimer("getTableColumnStatistics"); - tableColStats = - rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); + tableColStats = rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); Deadline.stopTimer(); } // If the table could not cached due to memory limit, stop prewarm - boolean isSuccess = sharedCache.populateTableInCache(table, tableColStats, partitions, - partitionColStats, aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); + boolean isSuccess = sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats, + aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); if (isSuccess) { LOG.trace("Cached Database: {}'s Table: {}.", dbName, tblName); } else { - LOG.info( - "Unable to cache Database: {}'s Table: {}, since the cache memory is full. " - + "Will stop attempting to cache any more tables.", - dbName, tblName); - completePrewarm(startTime); + LOG.info("Unable to cache Database: {}'s Table: {}, since the cache memory is full. " + + "Will stop attempting to cache any more tables.", dbName, tblName); + completePrewarm(startTime, false); return; } } catch (MetaException | NoSuchObjectException e) { + LOG.debug(ExceptionUtils.getStackTrace(e)); // Continue with next table continue; } - LOG.debug("Processed database: {}'s table: {}. Cached {} / {} tables so far.", dbName, - tblName, ++numberOfTablesCachedSoFar, totalTablesToCache); + LOG.debug("Processed database: {}'s table: {}. Cached {} / {} tables so far.", dbName, tblName, + ++numberOfTablesCachedSoFar, totalTablesToCache); } catch (EmptyStackException e) { // We've prewarmed this database, continue with the next one continue; } } - LOG.debug("Processed database: {}. Cached {} / {} databases so far.", dbName, - ++numberOfDatabasesCachedSoFar, databases.size()); + LOG.debug("Processed database: {}. Cached {} / {} databases so far.", dbName, ++numberOfDatabasesCachedSoFar, + databases.size()); } sharedCache.clearDirtyFlags(); - completePrewarm(startTime); + completePrewarm(startTime, true); } } - private static void completePrewarm(long startTime) { + @VisibleForTesting + static void clearSharedCache() { + sharedCache = new SharedCache(); + } + + static void completePrewarm(long startTime, boolean cachedAllMetadata) { isCachePrewarmed.set(true); + isCachedAllMetadata.set(cachedAllMetadata); LOG.info("CachedStore initialized"); long endTime = System.nanoTime(); LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms"); @@ -606,12 +611,10 @@ public class CachedStore implements RawStore, Configurable { } private static void initBlackListWhiteList(Configuration conf) { - if (whitelistPatterns == null || blacklistPatterns == null) { - whitelistPatterns = createPatterns(MetastoreConf.getAsString(conf, - MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST)); - blacklistPatterns = createPatterns(MetastoreConf.getAsString(conf, - MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST)); - } + whitelistPatterns = createPatterns(MetastoreConf.getAsString(conf, + MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST)); + blacklistPatterns = createPatterns(MetastoreConf.getAsString(conf, + MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST)); } private static Collection<String> catalogsToCache(RawStore rs) throws MetaException { @@ -642,7 +645,7 @@ public class CachedStore implements RawStore, Configurable { cacheRefreshPeriodMS = MetastoreConf.getTimeVar(conf, ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS); } - LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriodMS); + LOG.info("CachedStore: starting cache update service (run every {} ms)", cacheRefreshPeriodMS); cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -656,7 +659,7 @@ public class CachedStore implements RawStore, Configurable { cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, cacheRefreshPeriodMS, TimeUnit.MILLISECONDS); } - } + } if (runOnlyOnce) { // Some tests control the execution of the background update thread cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, TimeUnit.MILLISECONDS); @@ -735,7 +738,8 @@ public class CachedStore implements RawStore, Configurable { void update() { Deadline.registerIfNot(1000000); - LOG.debug("CachedStore: updating cached objects"); + LOG.debug("CachedStore: updating cached objects. Shared cache has been update {} times so far.", + sharedCache.getUpdateCount()); try { for (String catName : catalogsToCache(rawStore)) { List<String> dbNames = rawStore.getAllDatabases(catName); @@ -748,6 +752,7 @@ public class CachedStore implements RawStore, Configurable { try { tblNames = rawStore.getAllTables(catName, dbName); } catch (MetaException e) { + LOG.debug(ExceptionUtils.getStackTrace(e)); // Continue with next database continue; } @@ -765,51 +770,64 @@ public class CachedStore implements RawStore, Configurable { updateTableAggregatePartitionColStats(rawStore, catName, dbName, tblName); } } - } - sharedCache.incrementUpdateCount(); + } + sharedCache.incrementUpdateCount(); + LOG.debug("CachedStore: updated cached objects. Shared cache update count is: {}", + sharedCache.getUpdateCount()); } catch (MetaException e) { LOG.error("Updating CachedStore: error happen when refresh; skipping this iteration", e); } } - private void updateDatabases(RawStore rawStore, String catName, List<String> dbNames) { - // Prepare the list of databases - List<Database> databases = new ArrayList<>(); - for (String dbName : dbNames) { - Database db; - try { - db = rawStore.getDatabase(catName, dbName); - databases.add(db); - } catch (NoSuchObjectException e) { - LOG.info("Updating CachedStore: database - " + catName + "." + dbName - + " does not exist.", e); + LOG.debug("CachedStore: updating cached database objects for catalog: {}", catName); + boolean success = false; + // Try MAX_RETRIES times, then move to next method + int maxTries = MAX_RETRIES; + while (!success && (maxTries-- > 0)) { + // Prepare the list of databases + List<Database> databases = new ArrayList<>(); + for (String dbName : dbNames) { + Database db; + try { + db = rawStore.getDatabase(catName, dbName); + databases.add(db); + } catch (NoSuchObjectException e) { + LOG.info("Updating CachedStore: database: " + catName + "." + dbName + " does not exist.", e); + } } + success = sharedCache.refreshDatabasesInCache(databases); + LOG.debug("CachedStore: updated cached database objects for catalog: {}", catName); } - sharedCache.refreshDatabasesInCache(databases); } private void updateTables(RawStore rawStore, String catName, String dbName) { - List<Table> tables = new ArrayList<>(); - try { - List<String> tblNames = rawStore.getAllTables(catName, dbName); - for (String tblName : tblNames) { - if (!shouldCacheTable(catName, dbName, tblName)) { - continue; + LOG.debug("CachedStore: updating cached table objects for catalog: {}, database: {}", catName, dbName); + boolean success = false; + // Try MAX_RETRIES times, then move to next method + int maxTries = MAX_RETRIES; + while (!success && (maxTries-- > 0)) { + List<Table> tables = new ArrayList<>(); + try { + List<String> tblNames = rawStore.getAllTables(catName, dbName); + for (String tblName : tblNames) { + if (!shouldCacheTable(catName, dbName, tblName)) { + continue; + } + Table table = rawStore.getTable(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName)); + tables.add(table); } - Table table = rawStore.getTable(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); - tables.add(table); + success = sharedCache.refreshTablesInCache(catName, dbName, tables); + LOG.debug("CachedStore: updated cached table objects for catalog: {}, database: {}", catName, dbName); + } catch (MetaException e) { + LOG.debug("Unable to refresh cached tables for database: " + dbName, e); } - sharedCache.refreshTablesInCache(catName, dbName, tables); - } catch (MetaException e) { - LOG.debug("Unable to refresh cached tables for database: " + dbName, e); } } - private void updateTableColStats(RawStore rawStore, String catName, String dbName, String tblName) { + LOG.debug("CachedStore: updating cached table col stats objects for catalog: {}, database: {}", catName, dbName); boolean committed = false; rawStore.openTransaction(); try { @@ -817,18 +835,18 @@ public class CachedStore implements RawStore, Configurable { if (table != null && !table.isSetPartitionKeys()) { List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); Deadline.startTimer("getTableColumnStatistics"); - ColumnStatistics tableColStats = - rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); + ColumnStatistics tableColStats = rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); Deadline.stopTimer(); if (tableColStats != null) { sharedCache.refreshTableColStatsInCache(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); + StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), + tableColStats.getStatsObj()); // Update the table to get consistent stats state. sharedCache.alterTableInCache(catName, dbName, tblName, table); } } committed = rawStore.commitTransaction(); + LOG.debug("CachedStore: updated cached table col stats objects for catalog: {}, database: {}", catName, dbName); } catch (MetaException | NoSuchObjectException e) { LOG.info("Unable to refresh table column stats for table: " + tblName, e); } finally { @@ -840,19 +858,24 @@ public class CachedStore implements RawStore, Configurable { } private void updateTablePartitions(RawStore rawStore, String catName, String dbName, String tblName) { + LOG.debug("CachedStore: updating cached partition objects for catalog: {}, database: {}, table: {}", catName, + dbName, tblName); try { Deadline.startTimer("getPartitions"); List<Partition> partitions = rawStore.getPartitions(catName, dbName, tblName, Integer.MAX_VALUE); Deadline.stopTimer(); sharedCache.refreshPartitionsInCache(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partitions); + StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), partitions); + LOG.debug("CachedStore: updated cached partition objects for catalog: {}, database: {}, table: {}", catName, + dbName, tblName); } catch (MetaException | NoSuchObjectException e) { LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); } } private void updateTablePartitionColStats(RawStore rawStore, String catName, String dbName, String tblName) { + LOG.debug("CachedStore: updating cached partition col stats objects for catalog: {}, database: {}, table: {}", + catName, dbName, tblName); boolean committed = false; rawStore.openTransaction(); try { @@ -875,6 +898,8 @@ public class CachedStore implements RawStore, Configurable { } } committed = rawStore.commitTransaction(); + LOG.debug("CachedStore: updated cached partition col stats objects for catalog: {}, database: {}, table: {}", + catName, dbName, tblName); } catch (MetaException | NoSuchObjectException e) { LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); } finally { @@ -888,7 +913,9 @@ public class CachedStore implements RawStore, Configurable { // Update cached aggregate stats for all partitions of a table and for all // but default partition private static void updateTableAggregatePartitionColStats(RawStore rawStore, String catName, String dbName, - String tblName) { + String tblName) { + LOG.debug("CachedStore: updating cached aggregate partition col stats objects for catalog: {}, database: {}, table: {}", + catName, dbName, tblName); try { Table table = rawStore.getTable(catName, dbName, tblName); if (table == null) { @@ -898,13 +925,11 @@ public class CachedStore implements RawStore, Configurable { List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); if ((partNames != null) && (partNames.size() > 0)) { Deadline.startTimer("getAggregareStatsForAllPartitions"); - AggrStats aggrStatsAllPartitions = - rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); + AggrStats aggrStatsAllPartitions = rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); Deadline.stopTimer(); // Remove default partition from partition names and get aggregate stats again List<FieldSchema> partKeys = table.getPartitionKeys(); - String defaultPartitionValue = - MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); + String defaultPartitionValue = MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); List<String> partCols = new ArrayList<String>(); List<String> partVals = new ArrayList<String>(); for (FieldSchema fs : partKeys) { @@ -918,13 +943,13 @@ public class CachedStore implements RawStore, Configurable { rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); Deadline.stopTimer(); sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions, + StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions, aggrStatsAllButDefaultPartition, null); + LOG.debug("CachedStore: updated cached aggregate partition col stats objects for catalog: {}, database: {}, table: {}", + catName, dbName, tblName); } } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, - e); + LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, e); } } } @@ -1425,7 +1450,7 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getTables(String catName, String dbName, String pattern) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getTables(catName, dbName, pattern); } @@ -1436,7 +1461,7 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getTables(String catName, String dbName, String pattern, TableType tableType) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()|| !isCachedAllMetadata.get() || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getTables(catName, dbName, pattern, tableType); } @@ -1457,10 +1482,10 @@ public class CachedStore implements RawStore, Configurable { } @Override - public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames, - List<String> tableTypes) throws MetaException { + public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames, List<String> tableTypes) + throws MetaException { // TODO Check if all required tables are allowed, if so, get it from cache - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes); } @@ -1509,7 +1534,7 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getAllTables(String catName, String dbName) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getAllTables(catName, dbName); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index a0636b6..05cf70b 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -217,7 +217,7 @@ public class SharedCache { } } - boolean cachePartitions(Iterable<Partition> parts, SharedCache sharedCache) { + boolean cachePartitions(Iterable<Partition> parts, SharedCache sharedCache, boolean fromPrewarm) { try { tableLock.writeLock().lock(); for (Partition part : parts) { @@ -238,7 +238,9 @@ public class SharedCache { LOG.trace("Current cache size: {} bytes", currentCacheSizeInBytes); } partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), ptnWrapper); - isPartitionCacheDirty.set(true); + if (!fromPrewarm) { + isPartitionCacheDirty.set(true); + } } // Invalidate cached aggregate stats if (!aggrColStatsCache.isEmpty()) { @@ -1145,17 +1147,18 @@ public class SharedCache { } } - public void refreshDatabasesInCache(List<Database> databases) { + public boolean refreshDatabasesInCache(List<Database> databases) { + if (isDatabaseCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping database cache update; the database list we have is dirty."); + return false; + } try { cacheLock.writeLock().lock(); - if (isDatabaseCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping database cache update; the database list we have is dirty."); - return; - } databaseCache.clear(); for (Database db : databases) { addDatabaseToCache(db); } + return true; } finally { cacheLock.writeLock().unlock(); } @@ -1206,7 +1209,7 @@ public class SharedCache { } else { if (partitions != null) { // If the partitions were not added due to memory limit, return false - if (!tblWrapper.cachePartitions(partitions, this)) { + if (!tblWrapper.cachePartitions(partitions, this, true)) { return false; } } @@ -1442,23 +1445,23 @@ public class SharedCache { return tableNames; } - public void refreshTablesInCache(String catName, String dbName, List<Table> tables) { - try { - if (isTableCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping table cache update; the table list we have is dirty."); - return; - } - Map<String, TableWrapper> newCacheForDB = new TreeMap<>(); - for (Table tbl : tables) { - String tblName = StringUtils.normalizeIdentifier(tbl.getTableName()); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); - if (tblWrapper != null) { - tblWrapper.updateTableObj(tbl, this); - } else { - tblWrapper = createTableWrapper(catName, dbName, tblName, tbl); - } - newCacheForDB.put(CacheUtils.buildTableKey(catName, dbName, tblName), tblWrapper); + public boolean refreshTablesInCache(String catName, String dbName, List<Table> tables) { + if (isTableCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping table cache update; the table list we have is dirty."); + return false; + } + Map<String, TableWrapper> newCacheForDB = new TreeMap<>(); + for (Table tbl : tables) { + String tblName = StringUtils.normalizeIdentifier(tbl.getTableName()); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + if (tblWrapper != null) { + tblWrapper.updateTableObj(tbl, this); + } else { + tblWrapper = createTableWrapper(catName, dbName, tblName, tbl); } + newCacheForDB.put(CacheUtils.buildTableKey(catName, dbName, tblName), tblWrapper); + } + try { cacheLock.writeLock().lock(); Iterator<Entry<String, TableWrapper>> entryIterator = tableCache.entrySet().iterator(); while (entryIterator.hasNext()) { @@ -1468,10 +1471,9 @@ public class SharedCache { } } tableCache.putAll(newCacheForDB); + return true; } finally { - if (cacheLock.writeLock().isHeldByCurrentThread()) { - cacheLock.writeLock().unlock(); - } + cacheLock.writeLock().unlock(); } } @@ -1605,7 +1607,7 @@ public class SharedCache { cacheLock.readLock().lock(); TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { - tblWrapper.cachePartitions(parts, this); + tblWrapper.cachePartitions(parts, this, false); } } finally { cacheLock.readLock().unlock(); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java index 8696c2f..8caa929 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java @@ -28,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ndv.hll.HyperLogLog; +import org.apache.hadoop.hive.metastore.Deadline; import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; import org.apache.hadoop.hive.metastore.ObjectStore; @@ -35,24 +36,34 @@ import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest; import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.Date; +import org.apache.hadoop.hive.metastore.api.Decimal; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.utils.DecimalUtils; import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder; +import org.apache.hadoop.hive.metastore.columnstats.cache.DateColumnStatsDataInspector; +import org.apache.hadoop.hive.metastore.columnstats.cache.DecimalColumnStatsDataInspector; +import org.apache.hadoop.hive.metastore.columnstats.cache.DoubleColumnStatsDataInspector; import org.apache.hadoop.hive.metastore.columnstats.cache.LongColumnStatsDataInspector; import org.apache.hadoop.hive.metastore.columnstats.cache.StringColumnStatsDataInspector; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.utils.FileUtils; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -64,32 +75,80 @@ import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; @Category(MetastoreCheckinTest.class) public class TestCachedStore { - - private ObjectStore objectStore; - private CachedStore cachedStore; - private SharedCache sharedCache; - private Configuration conf; + // cs_db1 + Database db1; + // cs_db2 + Database db2; + // cs_db1_unptntbl1 + Table db1Utbl1; + // cs_db1_ptntbl1 + Table db1Ptbl1; + // cs_db2_unptntbl1 + Table db2Utbl1; + // cs_db2_ptntbl1 + Table db2Ptbl1; + // Partitions for cs_db1_ptntbl1 (a/1, a/2 ... e/4, e/5) + List<Partition> db1Ptbl1Ptns; + List<String> db1Ptbl1PtnNames; + // Partitions for cs_db2_ptntbl1 (a/1, a/2 ... e/4, e/5) + List<Partition> db2Ptbl1Ptns; + List<String> db2Ptbl1PtnNames; @Before public void setUp() throws Exception { - conf = MetastoreConf.newMetastoreConf(); + Deadline.registerIfNot(10000000); + Deadline.startTimer(""); + Configuration conf = MetastoreConf.newMetastoreConf(); MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); - // Disable memory estimation for this test class - MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); MetaStoreTestUtils.setConfForStandloneMode(conf); - objectStore = new ObjectStore(); + ObjectStore objectStore = new ObjectStore(); objectStore.setConf(conf); - cachedStore = new CachedStore(); - cachedStore.setConfForTest(conf); - // Stop the CachedStore cache update service. We'll start it explicitly to control the test - CachedStore.stopCacheUpdateService(1); - sharedCache = new SharedCache(); - sharedCache.getDatabaseCache().clear(); - sharedCache.getTableCache().clear(); - sharedCache.getSdCache().clear(); - // Create the 'hive' catalog HiveMetaStore.HMSHandler.createDefaultCatalog(objectStore, new Warehouse(conf)); + // Create 2 database objects + db1 = createDatabaseObject("cs_db1", "user1"); + objectStore.createDatabase(db1); + db2 = createDatabaseObject("cs_db2", "user1"); + objectStore.createDatabase(db2); + // For each database object, create one partitioned and one unpartitioned table + db1Utbl1 = createUnpartitionedTableObject(db1); + objectStore.createTable(db1Utbl1); + db1Ptbl1 = createPartitionedTableObject(db1); + objectStore.createTable(db1Ptbl1); + db2Utbl1 = createUnpartitionedTableObject(db2); + objectStore.createTable(db2Utbl1); + db2Ptbl1 = createPartitionedTableObject(db2); + objectStore.createTable(db2Ptbl1); + // Create partitions for cs_db1's partitioned table + db1Ptbl1Ptns = createPartitionObjects(db1Ptbl1).getPartitions(); + db1Ptbl1PtnNames = createPartitionObjects(db1Ptbl1).getPartitionNames(); + objectStore.addPartitions(db1Ptbl1.getCatName(), db1Ptbl1.getDbName(), db1Ptbl1.getTableName(), db1Ptbl1Ptns); + // Create partitions for cs_db2's partitioned table + db2Ptbl1Ptns = createPartitionObjects(db2Ptbl1).getPartitions(); + db2Ptbl1PtnNames = createPartitionObjects(db2Ptbl1).getPartitionNames(); + objectStore.addPartitions(db2Ptbl1.getCatName(), db2Ptbl1.getDbName(), db2Ptbl1.getTableName(), db2Ptbl1Ptns); + objectStore.shutdown(); + } + + @After + public void teardown() throws Exception { + Deadline.startTimer(""); + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetaStoreTestUtils.setConfForStandloneMode(conf); + ObjectStore objectStore = new ObjectStore(); + objectStore.setConf(conf); + objectStore.dropTable(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName()); + Deadline.startTimer(""); + objectStore.dropPartitions(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName(), db1Ptbl1PtnNames); + objectStore.dropTable(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName()); + objectStore.dropTable(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName()); + Deadline.startTimer(""); + objectStore.dropPartitions(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName(), db2Ptbl1PtnNames); + objectStore.dropTable(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName()); + objectStore.dropDatabase(DEFAULT_CATALOG_NAME, db1.getName()); + objectStore.dropDatabase(DEFAULT_CATALOG_NAME, db2.getName()); + objectStore.shutdown(); } /********************************************************************************************** @@ -97,394 +156,581 @@ public class TestCachedStore { *********************************************************************************************/ @Test - public void testDatabaseOps() throws Exception { + public void testPrewarm() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(objectStore); + List<String> allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); + Assert.assertEquals(2, allDatabases.size()); + Assert.assertTrue(allDatabases.contains(db1.getName())); + Assert.assertTrue(allDatabases.contains(db2.getName())); + List<String> db1Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(2, db1Tables.size()); + Assert.assertTrue(db1Tables.contains(db1Utbl1.getTableName())); + Assert.assertTrue(db1Tables.contains(db1Ptbl1.getTableName())); + List<String> db2Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(2, db2Tables.size()); + Assert.assertTrue(db2Tables.contains(db2Utbl1.getTableName())); + Assert.assertTrue(db2Tables.contains(db2Ptbl1.getTableName())); + // cs_db1_ptntbl1 + List<Partition> db1Ptbl1Partitions = + cachedStore.getPartitions(DEFAULT_CATALOG_NAME, db1.getName(), db1Ptbl1.getTableName(), -1); + Assert.assertEquals(25, db1Ptbl1Partitions.size()); + Deadline.startTimer(""); + List<Partition> db1Ptbl1PartitionsOS = + objectStore.getPartitions(DEFAULT_CATALOG_NAME, db2.getName(), db1Ptbl1.getTableName(), -1); + Assert.assertTrue(db1Ptbl1Partitions.containsAll(db1Ptbl1PartitionsOS)); + // cs_db2_ptntbl1 + List<Partition> db2Ptbl1Partitions = + cachedStore.getPartitions(DEFAULT_CATALOG_NAME, db2.getName(), db2Ptbl1.getTableName(), -1); + Assert.assertEquals(25, db2Ptbl1Partitions.size()); + Deadline.startTimer(""); + List<Partition> db2Ptbl1PartitionsOS = + objectStore.getPartitions(DEFAULT_CATALOG_NAME, db2.getName(), db2Ptbl1.getTableName(), -1); + Assert.assertTrue(db2Ptbl1Partitions.containsAll(db2Ptbl1PartitionsOS)); + cachedStore.shutdown(); + } + + @Test + public void testPrewarmBlackList() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + // Don't cache tables from hive.cs_db2 + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST, "hive.cs_db2.*"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(objectStore); + SharedCache sharedCache = CachedStore.getSharedCache(); + // cachedStore.getAllTables falls back to objectStore when whitelist/blacklist is set + List<String> db1Tables = sharedCache.listCachedTableNames(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(2, db1Tables.size()); + List<String> db2Tables = sharedCache.listCachedTableNames(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(0, db2Tables.size()); + cachedStore.shutdown(); + } + + @Test + public void testPrewarmWhiteList() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + // Only cache tables from hive.cs_db1 + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST, "hive.cs_db1.*"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(objectStore); + SharedCache sharedCache = CachedStore.getSharedCache(); + // cachedStore.getAllTables falls back to objectStore when whitelist/blacklist is set + List<String> db1Tables = sharedCache.listCachedTableNames(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(2, db1Tables.size()); + List<String> db2Tables = sharedCache.listCachedTableNames(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(0, db2Tables.size()); + cachedStore.shutdown(); + } + + //@Test + // Note: the 44Kb approximation has been determined based on trial/error. + // If this starts failing on different env, might need another look. + public void testPrewarmMemoryEstimation() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "44Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(objectStore); + SharedCache sharedCache = CachedStore.getSharedCache(); + List<String> db1Tables = sharedCache.listCachedTableNames(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(2, db1Tables.size()); + List<String> db2Tables = sharedCache.listCachedTableNames(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(0, db2Tables.size()); + cachedStore.shutdown(); + } + + @Test + public void testCacheUpdate() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(objectStore); + // Drop basedb1's unpartitioned table + objectStore.dropTable(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName()); + Deadline.startTimer(""); + // Drop a partitions of basedb1's partitioned table + objectStore.dropPartitions(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName(), db1Ptbl1PtnNames); + // Update SharedCache + updateCache(cachedStore); + List<String> allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); + Assert.assertEquals(2, allDatabases.size()); + Assert.assertTrue(allDatabases.contains(db1.getName())); + Assert.assertTrue(allDatabases.contains(db2.getName())); + // cs_db1_ptntbl1 + List<String> db1Tbls = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(1, db1Tbls.size()); + Assert.assertTrue(db1Tbls.contains(db1Ptbl1.getTableName())); + List<Partition> db1Ptns = + cachedStore.getPartitions(DEFAULT_CATALOG_NAME, db1.getName(), db1Ptbl1.getTableName(), -1); + Assert.assertEquals(0, db1Ptns.size()); + // cs_db2_ptntbl1 + List<String> db2Tbls = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(2, db2Tbls.size()); + Assert.assertTrue(db2Tbls.contains(db2Utbl1.getTableName())); + Assert.assertTrue(db2Tbls.contains(db2Ptbl1.getTableName())); + List<Partition> db2Ptns = + cachedStore.getPartitions(DEFAULT_CATALOG_NAME, db2.getName(), db2Ptbl1.getTableName(), -1); + Assert.assertEquals(25, db2Ptns.size()); + Deadline.startTimer(""); + List<Partition> db2PtnsOS = + objectStore.getPartitions(DEFAULT_CATALOG_NAME, db2.getName(), db2Ptbl1.getTableName(), -1); + Assert.assertTrue(db2Ptns.containsAll(db2PtnsOS)); + // Create a new unpartitioned table under basedb1 + Table db1Utbl2 = createUnpartitionedTableObject(db1); + db1Utbl2.setTableName(db1.getName() + "_unptntbl2"); + objectStore.createTable(db1Utbl2); + // Add a new partition to db1PartitionedTable + // Create partitions for cs_db1's partitioned table + db1Ptbl1Ptns = createPartitionObjects(db1Ptbl1).getPartitions(); + Deadline.startTimer(""); + objectStore.addPartition(db1Ptbl1Ptns.get(0)); + objectStore.addPartition(db1Ptbl1Ptns.get(1)); + objectStore.addPartition(db1Ptbl1Ptns.get(2)); + objectStore.addPartition(db1Ptbl1Ptns.get(3)); + objectStore.addPartition(db1Ptbl1Ptns.get(4)); + updateCache(cachedStore); + allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); + Assert.assertEquals(2, allDatabases.size()); + Assert.assertTrue(allDatabases.contains(db1.getName())); + Assert.assertTrue(allDatabases.contains(db2.getName())); + db1Tbls = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(2, db1Tbls.size()); + Assert.assertTrue(db1Tbls.contains(db1Ptbl1.getTableName())); + Assert.assertTrue(db1Tbls.contains(db1Utbl2.getTableName())); + db2Tbls = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(2, db2Tbls.size()); + Assert.assertTrue(db2Tbls.contains(db2Utbl1.getTableName())); + Assert.assertTrue(db2Tbls.contains(db2Ptbl1.getTableName())); + // cs_db1_ptntbl1 + db1Ptns = cachedStore.getPartitions(DEFAULT_CATALOG_NAME, db1.getName(), db1Ptbl1.getTableName(), -1); + Assert.assertEquals(5, db1Ptns.size()); + // cs_db2_ptntbl1 + db2Ptns = cachedStore.getPartitions(DEFAULT_CATALOG_NAME, db2.getName(), db2Ptbl1.getTableName(), -1); + Assert.assertEquals(25, db2Ptns.size()); + Deadline.startTimer(""); + db2PtnsOS = objectStore.getPartitions(DEFAULT_CATALOG_NAME, db2.getName(), db2Ptbl1.getTableName(), -1); + Assert.assertTrue(db2Ptns.containsAll(db2PtnsOS)); + // Clean up + objectStore.dropTable(DEFAULT_CATALOG_NAME, db1Utbl2.getDbName(), db1Utbl2.getTableName()); + cachedStore.shutdown(); + } + + @Test + public void testCreateAndGetDatabase() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); // Add a db via ObjectStore - String dbName = "testDatabaseOps"; + String dbName = "testCreateAndGetDatabase"; String dbOwner = "user1"; - Database db = createTestDb(dbName, dbOwner); + Database db = createDatabaseObject(dbName, dbOwner); objectStore.createDatabase(db); db = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); // Prewarm CachedStore CachedStore.setCachePrewarmedState(false); CachedStore.prewarm(objectStore); - // Read database via CachedStore Database dbRead = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); Assert.assertEquals(db, dbRead); - + List<String> allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); + Assert.assertEquals(3, allDatabases.size()); // Add another db via CachedStore - final String dbName1 = "testDatabaseOps1"; - Database db1 = createTestDb(dbName1, dbOwner); + String dbName1 = "testCreateAndGetDatabase1"; + Database db1 = createDatabaseObject(dbName1, dbOwner); cachedStore.createDatabase(db1); db1 = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1); + // Read db via ObjectStore + dbRead = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1); + Assert.assertEquals(db1, dbRead); + allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); + Assert.assertEquals(4, allDatabases.size()); + // Clean up + objectStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName); + objectStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName1); + cachedStore.shutdown(); + } + @Test + public void testDropDatabase() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); + // Add a db via ObjectStore + String dbName = "testDropDatabase"; + String dbOwner = "user1"; + Database db = createDatabaseObject(dbName, dbOwner); + objectStore.createDatabase(db); + db = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(objectStore); + // Read database via CachedStore + Database dbRead = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); + Assert.assertEquals(db, dbRead); + List<String> allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); + Assert.assertEquals(3, allDatabases.size()); + // Drop db via CachedStore + cachedStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName); + // Read via ObjectStore + allDatabases = objectStore.getAllDatabases(DEFAULT_CATALOG_NAME); + Assert.assertEquals(2, allDatabases.size()); + // Create another db via CachedStore and drop via ObjectStore + String dbName1 = "testDropDatabase1"; + Database db1 = createDatabaseObject(dbName1, dbOwner); + cachedStore.createDatabase(db1); + db1 = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1); // Read db via ObjectStore dbRead = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1); Assert.assertEquals(db1, dbRead); + allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); + Assert.assertEquals(3, allDatabases.size()); + objectStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName1); + updateCache(cachedStore); + updateCache(cachedStore); + allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); + Assert.assertEquals(2, allDatabases.size()); + cachedStore.shutdown(); + } + @Test + public void testAlterDatabase() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(objectStore); + // Read database via CachedStore + List<String> allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); + Assert.assertEquals(2, allDatabases.size()); // Alter the db via CachedStore (can only alter owner or parameters) - dbOwner = "user2"; - db = new Database(db); + String dbOwner = "user2"; + Database db = new Database(db1); db.setOwnerName(dbOwner); + String dbName = db1.getName(); cachedStore.alterDatabase(DEFAULT_CATALOG_NAME, dbName, db); db = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); - // Read db via ObjectStore - dbRead = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); + Database dbRead = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); Assert.assertEquals(db, dbRead); - - // Add another db via ObjectStore - final String dbName2 = "testDatabaseOps2"; - Database db2 = createTestDb(dbName2, dbOwner); - objectStore.createDatabase(db2); - db2 = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName2); - - // Alter db "testDatabaseOps" via ObjectStore - dbOwner = "user1"; - db = new Database(db); + // Alter db via ObjectStore + dbOwner = "user3"; + db = new Database(db1); db.setOwnerName(dbOwner); objectStore.alterDatabase(DEFAULT_CATALOG_NAME, dbName, db); db = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); - - // Drop db "testDatabaseOps1" via ObjectStore - objectStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName1); - - // We update twice to accurately detect if cache is dirty or not updateCache(cachedStore); updateCache(cachedStore); - - // Read the newly added db via CachedStore - dbRead = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName2); - Assert.assertEquals(db2, dbRead); - - // Read the altered db via CachedStore (altered user from "user2" to "user1") + // Read db via CachedStore dbRead = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); Assert.assertEquals(db, dbRead); - - // Try to read the dropped db after cache update - try { - dbRead = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName1); - Assert.fail("The database: " + dbName1 - + " should have been removed from the cache after running the update service"); - } catch (NoSuchObjectException e) { - // Expected - } - - // Clean up - objectStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName); - objectStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName2); - sharedCache.getDatabaseCache().clear(); - sharedCache.getTableCache().clear(); - sharedCache.getSdCache().clear(); + cachedStore.shutdown(); } @Test - public void testTableOps() throws Exception { - // Add a db via ObjectStore - String dbName = "testTableOps"; - String dbOwner = "user1"; - Database db = createTestDb(dbName, dbOwner); - objectStore.createDatabase(db); - db = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); - - // Add a table via ObjectStore - String tblName = "tbl"; - String tblOwner = "user1"; - FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); - FieldSchema col2 = new FieldSchema("col2", "string", "string column"); - List<FieldSchema> cols = new ArrayList<FieldSchema>(); - cols.add(col1); - cols.add(col2); - List<FieldSchema> ptnCols = new ArrayList<FieldSchema>(); - Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols); - objectStore.createTable(tbl); - tbl = objectStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName); - + public void testCreateAndGetTable() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); // Prewarm CachedStore CachedStore.setCachePrewarmedState(false); CachedStore.prewarm(objectStore); - - // Read database, table via CachedStore - Database dbRead= cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); - Assert.assertEquals(db, dbRead); - Table tblRead = cachedStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName); - Assert.assertEquals(tbl, tblRead); - - // Add a new table via CachedStore - String tblName1 = "tbl1"; - Table tbl1 = new Table(tbl); - tbl1.setTableName(tblName1); - cachedStore.createTable(tbl1); - tbl1 = cachedStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName1); - - // Read via object store - tblRead = objectStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName1); - Assert.assertEquals(tbl1, tblRead); - - // Add a new table via ObjectStore - String tblName2 = "tbl2"; - Table tbl2 = new Table(tbl); - tbl2.setTableName(tblName2); - objectStore.createTable(tbl2); - tbl2 = objectStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName2); - - // Alter table "tbl" via ObjectStore - tblOwner = "role1"; - tbl.setOwner(tblOwner); - tbl.setOwnerType(PrincipalType.ROLE); - objectStore.alterTable(DEFAULT_CATALOG_NAME, dbName, tblName, tbl, null); - tbl = objectStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName); - - Assert.assertEquals("Owner of the table did not change.", tblOwner, tbl.getOwner()); - Assert.assertEquals("Owner type of the table did not change", PrincipalType.ROLE, tbl.getOwnerType()); - - // Drop table "tbl1" via ObjectStore - objectStore.dropTable(DEFAULT_CATALOG_NAME, dbName, tblName1); - - // We update twice to accurately detect if cache is dirty or not - updateCache(cachedStore); + // Read database via CachedStore + List<String> allDatabases = cachedStore.getAllDatabases(DEFAULT_CATALOG_NAME); + Assert.assertEquals(2, allDatabases.size()); + List<String> db1Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(2, db1Tables.size()); + List<String> db2Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(2, db2Tables.size()); + // Add a new table to db1 via CachedStore + // Create a new unpartitioned table under db1 + Table db1Utbl2 = createUnpartitionedTableObject(db1); + db1Utbl2.setTableName(db1.getName() + "_unptntbl2"); + cachedStore.createTable(db1Utbl2); + db1Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(3, db1Tables.size()); + db1Utbl2 = cachedStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl2.getDbName(), db1Utbl2.getTableName()); + Table tblRead = objectStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl2.getDbName(), db1Utbl2.getTableName()); + Assert.assertEquals(db1Utbl2, tblRead); + // Create a new unpartitioned table under basedb2 via ObjectStore + Table db2Utbl2 = createUnpartitionedTableObject(db2); + db2Utbl2.setTableName(db2.getName() + "_unptntbl2"); + objectStore.createTable(db2Utbl2); + db2Utbl2 = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl2.getDbName(), db2Utbl2.getTableName()); updateCache(cachedStore); - - // Read "tbl2" via CachedStore - tblRead = cachedStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName2); - Assert.assertEquals(tbl2, tblRead); - - // Read the altered "tbl" via CachedStore - tblRead = cachedStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName); - Assert.assertEquals(tbl, tblRead); - - // Try to read the dropped "tbl1" via CachedStore (should throw exception) - tblRead = cachedStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName1); - Assert.assertNull(tblRead); - - // Should return "tbl" and "tbl2" - List<String> tblNames = cachedStore.getTables(DEFAULT_CATALOG_NAME, dbName, "*"); - Assert.assertTrue(tblNames.contains(tblName)); - Assert.assertTrue(!tblNames.contains(tblName1)); - Assert.assertTrue(tblNames.contains(tblName2)); - + db2Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(3, db2Tables.size()); + tblRead = cachedStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl2.getDbName(), db2Utbl2.getTableName()); + Assert.assertEquals(db2Utbl2, tblRead); // Clean up - objectStore.dropTable(DEFAULT_CATALOG_NAME, dbName, tblName); - objectStore.dropTable(DEFAULT_CATALOG_NAME, dbName, tblName2); - objectStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName); - sharedCache.getDatabaseCache().clear(); - sharedCache.getTableCache().clear(); - sharedCache.getSdCache().clear(); + objectStore.dropTable(DEFAULT_CATALOG_NAME, db1Utbl2.getDbName(), db1Utbl2.getTableName()); + db1Utbl2 = cachedStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl2.getDbName(), db1Utbl2.getTableName()); + objectStore.dropTable(DEFAULT_CATALOG_NAME, db2Utbl2.getDbName(), db2Utbl2.getTableName()); + cachedStore.shutdown(); } @Test - public void testPartitionOps() throws Exception { - // Add a db via ObjectStore - String dbName = "testPartitionOps"; - String dbOwner = "user1"; - Database db = createTestDb(dbName, dbOwner); - objectStore.createDatabase(db); - db = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); - - // Add a table via ObjectStore - String tblName = "tbl"; - String tblOwner = "user1"; - FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); - FieldSchema col2 = new FieldSchema("col2", "string", "string column"); - List<FieldSchema> cols = new ArrayList<FieldSchema>(); - cols.add(col1); - cols.add(col2); - FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column"); - List<FieldSchema> ptnCols = new ArrayList<FieldSchema>(); - ptnCols.add(ptnCol1); - Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols); - objectStore.createTable(tbl); - tbl = objectStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName); - - final String ptnColVal1 = "aaa"; - Map<String, String> partParams = new HashMap<String, String>(); - Partition ptn1 = - new Partition(Arrays.asList(ptnColVal1), dbName, tblName, 0, 0, tbl.getSd(), partParams); - ptn1.setCatName(DEFAULT_CATALOG_NAME); - objectStore.addPartition(ptn1); - ptn1 = objectStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1)); - ptn1.setCatName(DEFAULT_CATALOG_NAME); - final String ptnColVal2 = "bbb"; - Partition ptn2 = - new Partition(Arrays.asList(ptnColVal2), dbName, tblName, 0, 0, tbl.getSd(), partParams); - ptn2.setCatName(DEFAULT_CATALOG_NAME); - objectStore.addPartition(ptn2); - ptn2 = objectStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2)); - + // Note: the 44Kb approximation has been determined based on trial/error. + // If this starts failing on different env, might need another look. + public void testGetAllTablesPrewarmMemoryLimit() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "44Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); // Prewarm CachedStore CachedStore.setCachePrewarmedState(false); CachedStore.prewarm(objectStore); - - // Read database, table, partition via CachedStore - Database dbRead = cachedStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); - Assert.assertEquals(db, dbRead); - Table tblRead = cachedStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName); - Assert.assertEquals(tbl, tblRead); - Partition ptn1Read = cachedStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1)); - Assert.assertEquals(ptn1, ptn1Read); - Partition ptn2Read = cachedStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2)); - Assert.assertEquals(ptn2, ptn2Read); - - // Add a new partition via ObjectStore - final String ptnColVal3 = "ccc"; - Partition ptn3 = - new Partition(Arrays.asList(ptnColVal3), dbName, tblName, 0, 0, tbl.getSd(), partParams); - ptn3.setCatName(DEFAULT_CATALOG_NAME); - objectStore.addPartition(ptn3); - ptn3 = objectStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3)); - - // Alter an existing partition ("aaa") via ObjectStore - final String ptnColVal1Alt = "aaaAlt"; - Partition ptn1Atl = - new Partition(Arrays.asList(ptnColVal1Alt), dbName, tblName, 0, 0, tbl.getSd(), partParams); - ptn1Atl.setCatName(DEFAULT_CATALOG_NAME); - objectStore.alterPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1), ptn1Atl, null); - ptn1Atl = objectStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt)); - - // Drop an existing partition ("bbb") via ObjectStore - objectStore.dropPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2)); - - // We update twice to accurately detect if cache is dirty or not - updateCache(cachedStore); - updateCache(cachedStore); - - // Read the newly added partition via CachedStore - Partition ptnRead = cachedStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3)); - Assert.assertEquals(ptn3, ptnRead); - - // Read the altered partition via CachedStore - ptnRead = cachedStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt)); - Assert.assertEquals(ptn1Atl, ptnRead); - - // Try to read the dropped partition via CachedStore - try { - ptnRead = cachedStore.getPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal2)); - Assert.fail("The partition: " + ptnColVal2 - + " should have been removed from the cache after running the update service"); - } catch (NoSuchObjectException e) { - // Expected - } - // Clean up - objectStore.dropPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal1Alt)); - objectStore.dropPartition(DEFAULT_CATALOG_NAME, dbName, tblName, Arrays.asList(ptnColVal3)); - objectStore.dropTable(DEFAULT_CATALOG_NAME, dbName, tblName); - objectStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName); - sharedCache.getDatabaseCache().clear(); - sharedCache.getTableCache().clear(); - sharedCache.getSdCache().clear(); + SharedCache sharedCache = CachedStore.getSharedCache(); + List<String> db1Tables = sharedCache.listCachedTableNames(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(2, db1Tables.size()); + List<String> db2Tables = sharedCache.listCachedTableNames(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(0, db2Tables.size()); + // The CachedStore call should fall back to ObjectStore in case of partial metadata caching + List<String> db2TablesCS = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(2, db2TablesCS.size()); + cachedStore.shutdown(); } - //@Test - public void testTableColStatsOps() throws Exception { - // Add a db via ObjectStore - String dbName = "testTableColStatsOps"; - String dbOwner = "user1"; - Database db = createTestDb(dbName, dbOwner); - objectStore.createDatabase(db); - db = objectStore.getDatabase(DEFAULT_CATALOG_NAME, dbName); - - // Add a table via ObjectStore - final String tblName = "tbl"; - final String tblOwner = "user1"; - final FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); - // Stats values for col1 - long col1LowVal = 5; - long col1HighVal = 500; - long col1Nulls = 10; - long col1DV = 20; - final FieldSchema col2 = new FieldSchema("col2", "string", "string column"); - // Stats values for col2 - long col2MaxColLen = 100; - double col2AvgColLen = 45.5; - long col2Nulls = 5; - long col2DV = 40; - final FieldSchema col3 = new FieldSchema("col3", "boolean", "boolean column"); - // Stats values for col3 - long col3NumTrues = 100; - long col3NumFalses = 30; - long col3Nulls = 10; - final List<FieldSchema> cols = new ArrayList<>(); - cols.add(col1); - cols.add(col2); - cols.add(col3); - FieldSchema ptnCol1 = new FieldSchema("part1", "string", "string partition column"); - List<FieldSchema> ptnCols = new ArrayList<FieldSchema>(); - ptnCols.add(ptnCol1); - Table tbl = createTestTbl(dbName, tblName, tblOwner, cols, ptnCols); - objectStore.createTable(tbl); - tbl = objectStore.getTable(DEFAULT_CATALOG_NAME, dbName, tblName); - - // Add ColumnStatistics for tbl to metastore DB via ObjectStore - ColumnStatistics stats = new ColumnStatistics(); - ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName); - List<ColumnStatisticsObj> colStatObjs = new ArrayList<>(); - - // Col1 - ColumnStatisticsData data1 = new ColumnStatisticsData(); - ColumnStatisticsObj col1Stats = new ColumnStatisticsObj(col1.getName(), col1.getType(), data1); - LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector(); - longStats.setLowValue(col1LowVal); - longStats.setHighValue(col1HighVal); - longStats.setNumNulls(col1Nulls); - longStats.setNumDVs(col1DV); - data1.setLongStats(longStats); - colStatObjs.add(col1Stats); - - // Col2 - ColumnStatisticsData data2 = new ColumnStatisticsData(); - ColumnStatisticsObj col2Stats = new ColumnStatisticsObj(col2.getName(), col2.getType(), data2); - StringColumnStatsDataInspector stringStats = new StringColumnStatsDataInspector(); - stringStats.setMaxColLen(col2MaxColLen); - stringStats.setAvgColLen(col2AvgColLen); - stringStats.setNumNulls(col2Nulls); - stringStats.setNumDVs(col2DV); - data2.setStringStats(stringStats); - colStatObjs.add(col2Stats); - - // Col3 - ColumnStatisticsData data3 = new ColumnStatisticsData(); - ColumnStatisticsObj col3Stats = new ColumnStatisticsObj(col3.getName(), col3.getType(), data3); - BooleanColumnStatsData boolStats = new BooleanColumnStatsData(); - boolStats.setNumTrues(col3NumTrues); - boolStats.setNumFalses(col3NumFalses); - boolStats.setNumNulls(col3Nulls); - data3.setBooleanStats(boolStats); - colStatObjs.add(col3Stats); - - stats.setStatsDesc(statsDesc); - stats.setStatsObj(colStatObjs); + @Test + public void testGetAllTablesBlacklist() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + // Don't cache tables from hive.cs_db2 + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST, "hive.cs_db2.*"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(objectStore); + SharedCache sharedCache = CachedStore.getSharedCache(); + // cachedStore.getAllTables falls back to objectStore when whitelist/blacklist is set + List<String> db1Tables = sharedCache.listCachedTableNames(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(2, db1Tables.size()); + List<String> db2Tables = sharedCache.listCachedTableNames(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(0, db2Tables.size()); + List<String> db2TablesCS = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(2, db2TablesCS.size()); + cachedStore.shutdown(); + } - // Save to DB - objectStore.updateTableColumnStatistics(stats, null, -1); + @Test + public void testGetAllTablesWhitelist() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + // Only cache tables from hive.cs_db1 + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST, "hive.cs_db1.*"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(objectStore); + SharedCache sharedCache = CachedStore.getSharedCache(); + // cachedStore.getAllTables falls back to objectStore when whitelist/blacklist is set + List<String> db1Tables = sharedCache.listCachedTableNames(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(2, db1Tables.size()); + List<String> db2Tables = sharedCache.listCachedTableNames(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(0, db2Tables.size()); + List<String> db2TablesCS = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(2, db2TablesCS.size()); + cachedStore.shutdown(); + } + @Test + public void testGetTableByPattern() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); // Prewarm CachedStore CachedStore.setCachePrewarmedState(false); CachedStore.prewarm(objectStore); + List<String> db1Tables = cachedStore.getTables(DEFAULT_CATALOG_NAME, db1.getName(), "cs_db1.*"); + Assert.assertEquals(2, db1Tables.size()); + db1Tables = cachedStore.getTables(DEFAULT_CATALOG_NAME, db1.getName(), "cs_db1.un*"); + Assert.assertEquals(1, db1Tables.size()); + db1Tables = cachedStore.getTables(DEFAULT_CATALOG_NAME, db1.getName(), ".*tbl1"); + Assert.assertEquals(2, db1Tables.size()); + cachedStore.shutdown(); + } - // Read table stats via CachedStore - ColumnStatistics newStats = - cachedStore.getTableColumnStatistics(DEFAULT_CATALOG_NAME, dbName, tblName, - Arrays.asList(col1.getName(), col2.getName(), col3.getName())); - Assert.assertEquals(stats, newStats); + @Test + public void testAlterTable() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(objectStore); + List<String> db1Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(2, db1Tables.size()); + List<String> db2Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(2, db2Tables.size()); + // Alter table db1Utbl1 via CachedStore and read via ObjectStore + Table db1Utbl1Read = cachedStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName()); + String newOwner = "newOwner"; + Table db1Utbl1ReadAlt = new Table(db1Utbl1Read); + db1Utbl1ReadAlt.setOwner(newOwner); + cachedStore.alterTable(DEFAULT_CATALOG_NAME, db1Utbl1Read.getDbName(), db1Utbl1Read.getTableName(), db1Utbl1ReadAlt, + "0"); + db1Utbl1Read = + cachedStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl1ReadAlt.getDbName(), db1Utbl1ReadAlt.getTableName()); + Table db1Utbl1ReadOS = + objectStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl1ReadAlt.getDbName(), db1Utbl1ReadAlt.getTableName()); + Assert.assertEquals(db1Utbl1Read, db1Utbl1ReadOS); + // Alter table db2Utbl1 via ObjectStore and read via CachedStore + Table db2Utbl1Read = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName()); + Table db2Utbl1ReadAlt = new Table(db2Utbl1Read); + db2Utbl1ReadAlt.setOwner(newOwner); + objectStore.alterTable(DEFAULT_CATALOG_NAME, db2Utbl1Read.getDbName(), db2Utbl1Read.getTableName(), db2Utbl1ReadAlt, + "0"); + updateCache(cachedStore); + db2Utbl1Read = + objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1ReadAlt.getDbName(), db2Utbl1ReadAlt.getTableName()); + Table d21Utbl1ReadCS = + cachedStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1ReadAlt.getDbName(), db2Utbl1ReadAlt.getTableName()); + Assert.assertEquals(db2Utbl1Read, d21Utbl1ReadCS); + cachedStore.shutdown(); + } - // Clean up - objectStore.dropTable(DEFAULT_CATALOG_NAME, dbName, tblName); - objectStore.dropDatabase(DEFAULT_CATALOG_NAME, dbName); - sharedCache.getDatabaseCache().clear(); - sharedCache.getTableCache().clear(); - sharedCache.getSdCache().clear(); + @Test + public void testDropTable() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(objectStore); + List<String> db1Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(2, db1Tables.size()); + List<String> db2Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(2, db2Tables.size()); + // Drop table db1Utbl1 via CachedStore and read via ObjectStore + Table db1Utbl1Read = cachedStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName()); + cachedStore.dropTable(DEFAULT_CATALOG_NAME, db1Utbl1Read.getDbName(), db1Utbl1Read.getTableName()); + db1Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(1, db1Tables.size()); + Table db1Utbl1ReadOS = + objectStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl1Read.getDbName(), db1Utbl1Read.getTableName()); + Assert.assertNull(db1Utbl1ReadOS); + // Drop table db2Utbl1 via ObjectStore and read via CachedStore + Table db2Utbl1Read = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName()); + objectStore.dropTable(DEFAULT_CATALOG_NAME, db2Utbl1Read.getDbName(), db2Utbl1Read.getTableName()); + db2Tables = objectStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(1, db2Tables.size()); + updateCache(cachedStore); + db2Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(1, db2Tables.size()); + Table db2Utbl1ReadCS = + cachedStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1Read.getDbName(), db2Utbl1Read.getTableName()); + Assert.assertNull(db2Utbl1ReadCS); + cachedStore.shutdown(); } /********************************************************************************************** * Methods that test SharedCache + * @throws MetaException + * @throws NoSuchObjectException *********************************************************************************************/ @Test - public void testSharedStoreDb() { - Database db1 = createTestDb("db1", "user1"); - Database db2 = createTestDb("db2", "user1"); - Database db3 = createTestDb("db3", "user1"); - Database newDb1 = createTestDb("newdb1", "user1"); + public void testSharedStoreDb() throws NoSuchObjectException, MetaException { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + SharedCache sharedCache = CachedStore.getSharedCache(); + + Database db1 = createDatabaseObject("db1", "user1"); + Database db2 = createDatabaseObject("db2", "user1"); + Database db3 = createDatabaseObject("db3", "user1"); + Database newDb1 = createDatabaseObject("newdb1", "user1"); sharedCache.addDatabaseToCache(db1); sharedCache.addDatabaseToCache(db2); sharedCache.addDatabaseToCache(db3); @@ -497,10 +743,20 @@ public class TestCachedStore { Assert.assertEquals(dbs.size(), 2); Assert.assertTrue(dbs.contains("newdb1")); Assert.assertTrue(dbs.contains("db3")); + cachedStore.shutdown(); } @Test public void testSharedStoreTable() { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + SharedCache sharedCache = CachedStore.getSharedCache(); + Table tbl1 = new Table(); StorageDescriptor sd1 = new StorageDescriptor(); List<FieldSchema> cols1 = new ArrayList<>(); @@ -573,16 +829,24 @@ public class TestCachedStore { sharedCache.removeTableFromCache(DEFAULT_CATALOG_NAME, "db1", "tbl2"); Assert.assertEquals(sharedCache.getCachedTableCount(), 2); Assert.assertEquals(sharedCache.getSdCache().size(), 2); + cachedStore.shutdown(); } - @Test public void testSharedStorePartition() { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + SharedCache sharedCache = CachedStore.getSharedCache(); String dbName = "db1"; String tbl1Name = "tbl1"; String tbl2Name = "tbl2"; String owner = "user1"; - Database db = createTestDb(dbName, owner); + Database db = createDatabaseObject(dbName, owner); sharedCache.addDatabaseToCache(db); FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); FieldSchema col2 = new FieldSchema("col2", "string", "string column"); @@ -660,31 +924,35 @@ public class TestCachedStore { sharedCache.alterPartitionInCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, Arrays.asList("201701"), newPart1); t = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, Arrays.asList("201701")); Assert.assertEquals(t.getSd().getLocation(), "loc1new"); + cachedStore.shutdown(); } - @Test + //@Test public void testAggrStatsRepeatedRead() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); String dbName = "testTableColStatsOps"; String tblName = "tbl"; String colName = "f1"; - Database db = new DatabaseBuilder() - .setName(dbName) - .setLocation("some_location") - .build(conf); + Database db = new DatabaseBuilder().setName(dbName).setLocation("some_location").build(conf); cachedStore.createDatabase(db); List<FieldSchema> cols = new ArrayList<>(); cols.add(new FieldSchema(colName, "int", null)); List<FieldSchema> partCols = new ArrayList<>(); partCols.add(new FieldSchema("col", "int", null)); - StorageDescriptor sd = - new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<>()), - null, null, null); + StorageDescriptor sd = new StorageDescriptor(cols, null, "input", "output", false, 0, + new SerDeInfo("serde", "seriallib", new HashMap<>()), null, null, null); - Table tbl = - new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(), - null, null, TableType.MANAGED_TABLE.toString()); + Table tbl = new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(), null, null, + TableType.MANAGED_TABLE.toString()); tbl.setCatName(DEFAULT_CATALOG_NAME); cachedStore.createTable(tbl); @@ -693,12 +961,10 @@ public class TestCachedStore { List<String> partVals2 = new ArrayList<>(); partVals2.add("2"); - Partition ptn1 = - new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>()); + Partition ptn1 = new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>()); ptn1.setCatName(DEFAULT_CATALOG_NAME); cachedStore.addPartition(ptn1); - Partition ptn2 = - new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>()); + Partition ptn2 = new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>()); ptn2.setCatName(DEFAULT_CATALOG_NAME); cachedStore.addPartition(ptn2); @@ -732,10 +998,27 @@ public class TestCachedStore { Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100); aggrStats = cachedStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, dbName, tblName, aggrPartVals, colNames); Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100); + + objectStore.deletePartitionColumnStatistics(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName(), + Warehouse.makePartName(tbl.getPartitionKeys(), partVals1), partVals1, colName); + objectStore.deletePartitionColumnStatistics(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName(), + Warehouse.makePartName(tbl.getPartitionKeys(), partVals2), partVals2, colName); + objectStore.dropPartition(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName(), partVals1); + objectStore.dropPartition(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName(), partVals2); + objectStore.dropTable(DEFAULT_CATALOG_NAME, db.getName(), tbl.getTableName()) ; + objectStore.dropDatabase(DEFAULT_CATALOG_NAME, db.getName()); + cachedStore.shutdown(); } - @Test + //@Test public void testPartitionAggrStats() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); String dbName = "testTableColStatsOps1"; String tblName = "tbl1"; String colName = "f1"; @@ -748,13 +1031,11 @@ public class TestCachedStore { cols.add(new FieldSchema(colName, "int", null)); List<FieldSchema> partCols = new ArrayList<>(); partCols.add(new FieldSchema("col", "int", null)); - StorageDescriptor sd = - new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<>()), - null, null, null); + StorageDescriptor sd = new StorageDescriptor(cols, null, "input", "output", false, 0, + new SerDeInfo("serde", "seriallib", new HashMap<>()), null, null, null); - Table tbl = - new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(), - null, null, TableType.MANAGED_TABLE.toString()); + Table tbl = new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(), null, null, + TableType.MANAGED_TABLE.toString()); tbl.setCatName(DEFAULT_CATALOG_NAME); cachedStore.createTable(tbl); @@ -763,12 +1044,10 @@ public class TestCachedStore { List<String> partVals2 = new ArrayList<>(); partVals2.add("2"); - Partition ptn1 = - new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>()); + Partition ptn1 = new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>()); ptn1.setCatName(DEFAULT_CATALOG_NAME); cachedStore.addPartition(ptn1); - Partition ptn2 = - new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>()); + Partition ptn2 = new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>()); ptn2.setCatName(DEFAULT_CATALOG_NAME); cachedStore.addPartition(ptn2); @@ -806,10 +1085,19 @@ public class TestCachedStore { aggrStats = cachedStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, dbName, tblName, aggrPartVals, colNames); Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100); Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumDVs(), 40); + cachedStore.shutdown(); } - @Test + //@Test public void testPartitionAggrStatsBitVector() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + String dbName = "testTableColStatsOps2"; String tblName = "tbl2"; String colName = "f1"; @@ -822,13 +1110,11 @@ public class TestCachedStore { cols.add(new FieldSchema(colName, "int", null)); List<FieldSchema> partCols = new ArrayList<>(); partCols.add(new FieldSchema("col", "int", null)); - StorageDescriptor sd = - new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<>()), - null, null, null); + StorageDescriptor sd = new StorageDescriptor(cols, null, "input", "output", false, 0, + new SerDeInfo("serde", "seriallib", new HashMap<>()), null, null, null); - Table tbl = - new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(), - null, null, TableType.MANAGED_TABLE.toString()); + Table tbl = new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(), null, null, + TableType.MANAGED_TABLE.toString()); tbl.setCatName(DEFAULT_CATALOG_NAME); cachedStore.createTable(tbl); @@ -837,12 +1123,10 @@ public class TestCachedStore { List<String> partVals2 = new ArrayList<>(); partVals2.add("2"); - Partition ptn1 = - new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>()); + Partition ptn1 = new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>()); ptn1.setCatName(DEFAULT_CATALOG_NAME); cachedStore.addPartition(ptn1); - Partition ptn2 = - new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>()); + Partition ptn2 = new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>()); ptn2.setCatName(DEFAULT_CATALOG_NAME); cachedStore.addPartition(ptn2); @@ -894,10 +1178,20 @@ public class TestCachedStore { aggrStats = cachedStore.get_aggr_stats_for(DEFAULT_CATALOG_NAME, dbName, tblName, aggrPartVals, colNames); Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumNulls(), 100); Assert.assertEquals(aggrStats.getColStats().get(0).getStatsData().getLongStats().getNumDVs(), 5); + cachedStore.shutdown(); } @Test public void testMultiThreadedSharedCacheOps() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "-1Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTest(conf); + SharedCache sharedCache = CachedStore.getSharedCache(); + List<String> dbNames = new ArrayList<String>(Arrays.asList("db1", "db2", "db3", "db4", "db5")); List<Callable<Object>> tasks = new ArrayList<Callable<Object>>(); ExecutorService executor = Executors.newFixedThreadPool(50, new ThreadFactory() { @@ -913,7 +1207,7 @@ public class TestCachedStore { for (String dbName : dbNames) { Callable<Object> c = new Callable<Object>() { public Object call() { - Database db = createTestDb(dbName, "user1"); + Database db = createDatabaseObject(dbName, "user1"); sharedCache.addDatabaseToCache(db); return null; } @@ -928,8 +1222,7 @@ public class TestCachedStore { } // Created 5 tables under "db1" - List<String> tblNames = - new ArrayList<String>(Arrays.asList("tbl1", "tbl2", "tbl3", "tbl4", "tbl5")); + List<String> tblNames = new ArrayList<String>(Arrays.asList("tbl1", "tbl2", "tbl3", "tbl4", "tbl5")); tasks.clear(); for (String tblName : tblNames) { FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); @@ -965,8 +1258,8 @@ public class TestCachedStore { Map<String, String> partParams = new HashMap<String, String>(); Callable<Object> c = new Callable<Object>() { public Object call() { - Partition ptn = new Partition(Arrays.asList(ptnVal), dbNames.get(0), tblName, 0, 0, - tbl.getSd(), partParams); + Partition ptn = + new Partition(Arrays.asList(ptnVal), dbNames.get(0), tblName, 0, 0, tbl.getSd(), partParams); sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbNames.get(0), tblName, ptn); return null; } @@ -977,7 +1270,8 @@ public class TestCachedStore { executor.invokeAll(tasks); for (String tblName : tblNames) { for (String ptnVal : ptnVals) { - Partition ptn = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbNames.get(0), tblName, Arrays.asList(ptnVal)); + Partition ptn = + sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbNames.get(0), tblName, Arrays.asList(ptnVal)); Assert.assertNotNull(ptn); Assert.assertEquals(tblName, ptn.getTableName()); Assert.assertEquals(tblName, ptn.getTableName()); @@ -1007,8 +1301,8 @@ public class TestCachedStore { Map<String, String> partParams = new HashMap<String, String>(); Callable<Object> c = new Callable<Object>() { public Object call() { - Partition ptn = new Partition(Arrays.asList(ptnVal), dbNames.get(0), tblName, 0, 0, - tbl.getSd(), partParams); + Partition ptn = + new Partition(Arrays.asList(ptnVal), dbNames.get(0), tblName, 0, 0, tbl.getSd(), partParams); sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbNames.get(0), tblName, ptn); return null; } @@ -1019,7 +1313,8 @@ public class TestCachedStore { executor.invokeAll(tasks); for (String tblName : addPtnTblNames) { for (String ptnVal : newPtnVals) { - Partition ptn = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbNames.get(0), tblName, Arrays.asList(ptnVal)); + Partition ptn = + sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbNames.get(0), tblName, Arrays.asList(ptnVal)); Assert.assertNotNull(ptn); Assert.assertEquals(tblName, ptn.getTableName()); Assert.assertEquals(tblName, ptn.getTableName()); @@ -1030,12 +1325,25 @@ public class TestCachedStore { List<Partition> ptns = sharedCache.listCachedPartitions(DEFAULT_CATALOG_NAME, dbNames.get(0), tblName, 100); Assert.assertEquals(0, ptns.size()); } - sharedCache.getDatabaseCache().clear(); - sharedCache.getTableCache().clear(); - sharedCache.getSdCache().clear(); + cachedStore.shutdown(); } - private Database createTestDb(String dbName, String dbOwner) { + private Table createTestTbl(String dbName, String tblName, String tblOwner, List<FieldSchema> cols, + List<FieldSchema> ptnCols) { + String serdeLocation = "file:/tmp"; + Map<String, String> serdeParams = new HashMap<>(); + Map<String, String> tblParams = new HashMap<>(); + SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>()); + StorageDescriptor sd = + new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, null, serdeParams); + sd.setStoredAsSubDirectories(false); + Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null, + TableType.MANAGED_TABLE.toString()); + tbl.setCatName(DEFAULT_CATALOG_NAME); + return tbl; + } + + private Database createDatabaseObject(String dbName, String dbOwner) { String dbDescription = dbName; String dbLocation = "file:/tmp"; Map<String, String> dbParams = new HashMap<>(); @@ -1047,30 +1355,268 @@ public class TestCachedStore { return db; } - private Table createTestTbl(String dbName, String tblName, String tblOwner, - List<FieldSchema> cols, List<FieldSchema> ptnCols) { + /** + * Create an unpartitoned table object for the given db. + * The table has 9 types of columns + * @param db + * @return + */ + private Table createUnpartitionedTableObject(Database db) { + String dbName = db.getName(); + String owner = db.getName(); String serdeLocation = "file:/tmp"; Map<String, String> serdeParams = new HashMap<>(); Map<String, String> tblParams = new HashMap<>(); SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>()); - StorageDescriptor sd = new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, - serdeInfo, null, null, serdeParams); + FieldSchema col1 = new FieldSchema("col1", "binary", "binary column"); + FieldSchema col2 = new FieldSchema("col2", "boolean", "boolean column"); + FieldSchema col3 = new FieldSchema("col3", "date", "date column"); + FieldSchema col4 = new FieldSchema("col4", "decimal", "decimal column"); + FieldSchema col5 = new FieldSchema("col5", "double", "double column"); + FieldSchema col6 = new FieldSchema("col6", "float", "float column"); + FieldSchema col7 = new FieldSchema("col7", "int", "int column"); + FieldSchema col8 = new FieldSchema("col8", "string", "string column"); + List<FieldSchema> cols = Arrays.asList(col1, col2, col3, col4, col5, col6, col7, col8); + StorageDescriptor sd = + new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, null, serdeParams); sd.setStoredAsSubDirectories(false); - Table tbl = new Table(tblName, dbName, tblOwner, 0, 0, 0, sd, ptnCols, tblParams, null, null, + Table tbl = new Table(dbName + "_unptntbl1", dbName, owner, 0, 0, 0, sd, new ArrayList<>(), tblParams, null, null, TableType.MANAGED_TABLE.toString()); tbl.setCatName(DEFAULT_CATALOG_NAME); + tbl.setWriteId(0); return tbl; } + private TableAndColStats createUnpartitionedTableObjectWithColStats(Database db) { + String dbName = db.getName(); + String owner = db.getName(); + String serdeLocation = "file:/tmp"; + Map<String, String> serdeParams = new HashMap<>(); + Map<String, String> tblParams = new HashMap<>(); + SerDeInfo serdeInfo = new SerDeInfo("serde", "seriallib", new HashMap<>()); + FieldSchema col1 = new FieldSchema("col1", "binary", "binary column"); + // Stats values for col1 + long col1MaxColLength = 500; + double col1AvgColLength = 225.5; + long col1Nulls = 10; + FieldSchema col2 = new FieldSchema("col2", "boolean", "boolean column"); + long col2NumTrues = 100; + long col2NumFalses = 30; + long col2Nulls = 10; + FieldSchema col3 = new FieldSchema("col3", "date", "date column"); + Date col3LowVal = new Date(100); + Date col3HighVal = new Date(100000); + long col3Nulls = 10; + long col3DV = 20; + FieldSchema col4 = new FieldSchema("col4", "decimal", "decimal column"); + Decimal col4LowVal = DecimalUtils.getDecimal(3, 0); + Decimal col4HighVal = DecimalUtils.getDecimal(5, 0); + long col4Nulls = 10; + long col4DV = 20; + FieldSchema col5 = new FieldSchema("col5", "double", "double column"); + double col5LowVal = 10.5; + double col5HighVal = 550.5; + long col5Nulls = 10; + long col5DV = 20; + FieldSchema col6 = new FieldSchema("col6", "float", "float column"); + float col6LowVal = 10.5f; + float col6HighVal = 550.5f; + long col6Nulls = 10; + long col6DV = 20; + FieldSchema col7 = new FieldSchema("col7", "int", "int column"); + int col7LowVal = 10; + int col7HighVal = 550; + long col7Nulls = 10; + long col7DV = 20; + FieldSchema col8 = new FieldSchema("col8", "string", "string column"); + long col8MaxColLen = 100; + double col8AvgColLen = 45.5; + long col8Nulls = 5; + long col8DV = 40; + List<FieldSchema> cols = Arrays.asList(col1, col2, col3, col4, col5, col6, col7, col8); + StorageDescriptor sd = + new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serdeInfo, null, null, serdeParams); + sd.setStoredAsSubDirectories(false); + Table tbl = new Table(dbName + "_unptntbl1", dbName, owner, 0, 0, 0, sd, new ArrayList<>(), tblParams, null, null, + TableType.MANAGED_TABLE.toString()); + tbl.setCatName(DEFAULT_CATALOG_NAME); + tbl.setWriteId(0); + ColumnStatistics stats = new ColumnStatistics(); + ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tbl.getTableName()); + List<ColumnStatisticsObj> colStatObjList = new ArrayList<>(); + + // Col1 + ColumnStatisticsData data1 = new ColumnStatisticsData(); + ColumnStatisticsObj col1Stats = new ColumnStatisticsObj(col1.getName(), col1.getType(), data1); + BinaryColumnStatsData binaryStats = new BinaryColumnStatsData(); + binaryStats.setMaxColLen(col1MaxColLength); + binaryStats.setAvgColLen(col1AvgColLength); + binaryStats.setNumNulls(col1Nulls); + data1.setBinaryStats(binaryStats); + colStatObjList.add(col1Stats); + + // Col2 + ColumnStatisticsData data2 = new ColumnStatisticsData(); + ColumnStatisticsObj col2Stats = new ColumnStatisticsObj(col2.getName(), col2.getType(), data2); + BooleanColumnStatsData booleanStats = new BooleanColumnStatsData(); + booleanStats.setNumTrues(col2NumTrues); + booleanStats.setNumFalses(col2NumFalses); + booleanStats.setNumNulls(col2Nulls); + colStatObjList.add(col2Stats); + + // Col3 + ColumnStatisticsData data3 = new ColumnStatisticsData(); + ColumnStatisticsObj col3Stats = new ColumnStatisticsObj(col3.getName(), col3.getType(), data3); + DateColumnStatsDataInspector dateStats = new DateColumnStatsDataInspector(); + dateStats.setLowValue(col3LowVal); + dateStats.setHighValue(col3HighVal); + dateStats.setNumNulls(col3Nulls); + dateStats.setNumDVs(col3DV); + colStatObjList.add(col3Stats); + + // Col4 + ColumnStatisticsData data4 = new ColumnStatisticsData(); + ColumnStatisticsObj col4Stats = new ColumnStatisticsObj(col4.getName(), col4.getType(), data4); + DecimalColumnStatsDataInspector decimalStats = new DecimalColumnStatsDataInspector(); + decimalStats.setLowValue(col4LowVal); + decimalStats.setHighValue(col4HighVal); + decimalStats.setNumNulls(col4Nulls); + decimalStats.setNumDVs(col4DV); + colStatObjList.add(col4Stats); + + // Col5 + ColumnStatisticsData data5 = new ColumnStatisticsData(); + ColumnStatisticsObj col5Stats = new ColumnStatisticsObj(col5.getName(), col5.getType(), data5); + DoubleColumnStatsDataInspector doubleStats = new DoubleColumnStatsDataInspector(); + doubleStats.setLowValue(col5LowVal); + doubleStats.setHighValue(col5HighVal); + doubleStats.setNumNulls(col5Nulls); + doubleStats.setNumDVs(col5DV); + colStatObjList.add(col5Stats); + + // Col6 + ColumnStatisticsData data6 = new ColumnStatisticsData(); + ColumnStatisticsObj col6Stats = new ColumnStatisticsObj(col6.getName(), col6.getType(), data6); + DoubleColumnStatsDataInspector floatStats = new DoubleColumnStatsDataInspector(); + floatStats.setLowValue(col6LowVal); + floatStats.setHighValue(col6HighVal); + floatStats.setNumNulls(col6Nulls); + floatStats.setNumDVs(col6DV); + colStatObjList.add(col6Stats); + + // Col7 + ColumnStatisticsData data7 = new ColumnStatisticsData(); + ColumnStatisticsObj col7Stats = new ColumnStatisticsObj(col7.getName(), col7.getType(), data7); + LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector(); + longStats.setLowValue(col7LowVal); + longStats.setHighValue(col7HighVal); + longStats.setNumNulls(col7Nulls); + longStats.setNumDVs(col7DV); + colStatObjList.add(col7Stats); + + // Col8 + ColumnStatisticsData data8 = new ColumnStatisticsData(); + ColumnStatisticsObj col8Stats = new ColumnStatisticsObj(col8.getName(), col8.getType(), data8); + StringColumnStatsDataInspector stringStats = new StringColumnStatsDataInspector(); + stringStats.setMaxColLen(col8MaxColLen); + stringStats.setAvgColLen(col8AvgColLen); + stringStats.setNumNulls(col8Nulls); + stringStats.setNumDVs(col8DV); + colStatObjList.add(col8Stats); + + stats.setStatsDesc(statsDesc); + stats.setStatsObj(colStatObjList); + + return new TableAndColStats(tbl, stats); + } + + class TableAndColStats { + Table table; + ColumnStatistics colStats; + + TableAndColStats(Table table, ColumnStatistics colStats) { + this.table = table; + this.colStats = colStats; + } + } + + /** + * Create a partitoned table object for the given db. + * The table has 9 types of columns. + * The partition columns are string and integer + * @param db + * @return + */ + private Table createPartitionedTableObject(Database db) { + FieldSchema ptnCol1 = new FieldSchema("partCol1", "string", "string partition column"); + FieldSchema ptnCol2 = new FieldSchema("partCol2", "int", "integer partition column"); + List<FieldSchema> ptnCols = Arrays.asList(ptnCol1, ptnCol2); + Table tbl = createUnpartitionedTableObject(db); + tbl.setTableName(db.getName() + "_ptntbl1"); + tbl.setPartitionKeys(ptnCols); + return tbl; + } + + /** + * Create 25 partition objects for table returned by createPartitionedTableObject + * Partitions are: a/1, a/2, ... e/4, e/5 + * @param table + * @return + */ + private PartitionObjectsAndNames createPartitionObjects(Table table) { + List<String> partColNames = new ArrayList<>(); + for (FieldSchema col : table.getPartitionKeys()) { + partColNames.add(col.getName()); + } + List<Partition> ptns = new ArrayList<>(); + List<String> ptnNames = new ArrayList<>(); + String dbName = table.getDbName(); + String tblName = table.getTableName(); + StorageDescriptor sd = table.getSd(); + List<String> ptnCol1Vals = Arrays.asList("a", "b", "c", "d", "e"); + List<String> ptnCol2Vals = Arrays.asList("1", "2", "3", "4", "5"); + for (String ptnCol1Val : ptnCol1Vals) { + for (String ptnCol2Val : ptnCol2Vals) { + List<String> partVals = Arrays.asList(ptnCol1Val, ptnCol2Val); + Partition ptn = new Partition(partVals, dbName, tblName, 0, 0, sd, new HashMap<String, String>()); + ptn.setCatName(DEFAULT_CATALOG_NAME); + ptns.add(ptn); + ptnNames.add(FileUtils.makePartName(partColNames, partVals)); + } + } + return new PartitionObjectsAndNames(ptns, ptnNames); + } + + class PartitionObjectsAndNames { + List<Partition> ptns; + List<String> ptnNames; + + PartitionObjectsAndNames(List<Partition> ptns, List<String> ptnNames) { + this.ptns = ptns; + this.ptnNames = ptnNames; + } + + List<Partition> getPartitions() { + return ptns; + } + + List<String> getPartitionNames() { + return ptnNames; + } + } + // This method will return only after the cache has updated once - private void updateCache(CachedStore cachedStore) throws InterruptedException { - int maxTries = 100000; + private void updateCache(CachedStore cachedStore) throws Exception { + int maxTries = 100; long updateCountBefore = cachedStore.getCacheUpdateCount(); // Start the CachedStore update service CachedStore.startCacheUpdateService(cachedStore.getConf(), true, false); while ((cachedStore.getCacheUpdateCount() != (updateCountBefore + 1)) && (maxTries-- > 0)) { Thread.sleep(1000); } + if (maxTries <= 0) { + throw new Exception("Unable to update SharedCache in 100 attempts; possibly some bug"); + } CachedStore.stopCacheUpdateService(100); } }