http://git-wip-us.apache.org/repos/asf/hive/blob/26c0ab6a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index d28b196..d37b201 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -20,12 +20,12 @@ package org.apache.hadoop.hive.metastore.cache; import org.apache.hadoop.hive.metastore.api.CreationMetadata; import org.apache.hadoop.hive.metastore.api.ISchemaName; import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -35,7 +35,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -95,8 +94,6 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.api.SchemaVersion; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; import org.apache.hadoop.hive.metastore.api.Type; @@ -124,130 +121,50 @@ import com.google.common.annotations.VisibleForTesting; // TODO constraintCache // TODO need sd nested copy? // TODO String intern -// TODO restructure HBaseStore // TODO monitor event queue // TODO initial load slow? // TODO size estimation -// TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation public class CachedStore implements RawStore, Configurable { private static ScheduledExecutorService cacheUpdateMaster = null; - private static ReentrantReadWriteLock databaseCacheLock = new ReentrantReadWriteLock(true); - private static AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false); - private static ReentrantReadWriteLock tableCacheLock = new ReentrantReadWriteLock(true); - private static AtomicBoolean isTableCacheDirty = new AtomicBoolean(false); - private static ReentrantReadWriteLock partitionCacheLock = new ReentrantReadWriteLock(true); - private static AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false); - private static ReentrantReadWriteLock tableColStatsCacheLock = new ReentrantReadWriteLock(true); - private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false); - private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock( - true); - private static ReentrantReadWriteLock partitionAggrColStatsCacheLock = - new ReentrantReadWriteLock(true); - private static AtomicBoolean isPartitionAggrColStatsCacheDirty = new AtomicBoolean(false); - private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false); private static List<Pattern> whitelistPatterns = null; private static List<Pattern> blacklistPatterns = null; + // Default value set to 100 milliseconds for test purpose + private static long DEFAULT_CACHE_REFRESH_PERIOD = 100; + // Time after which metastore cache is updated from metastore DB by the background update thread + private static long cacheRefreshPeriodMS = DEFAULT_CACHE_REFRESH_PERIOD; + private static AtomicBoolean isCachePrewarmed = new AtomicBoolean(false); private RawStore rawStore = null; private Configuration conf; private PartitionExpressionProxy expressionProxy = null; - // Default value set to 100 milliseconds for test purpose - private static long cacheRefreshPeriod = 100; - - /** A wrapper over SharedCache. Allows one to get SharedCache safely; should be merged - * into SharedCache itself (see the TODO on the class). */ - private static final SharedCacheWrapper sharedCacheWrapper = new SharedCacheWrapper(); + private static final SharedCache sharedCache = new SharedCache(); static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName()); - static class TableWrapper { - Table t; - String location; - Map<String, String> parameters; - byte[] sdHash; - TableWrapper(Table t, byte[] sdHash, String location, Map<String, String> parameters) { - this.t = t; - this.sdHash = sdHash; - this.location = location; - this.parameters = parameters; - } - public Table getTable() { - return t; - } - public byte[] getSdHash() { - return sdHash; - } - public String getLocation() { - return location; - } - public Map<String, String> getParameters() { - return parameters; - } - } - - static class PartitionWrapper { - Partition p; - String location; - Map<String, String> parameters; - byte[] sdHash; - PartitionWrapper(Partition p, byte[] sdHash, String location, Map<String, String> parameters) { - this.p = p; - this.sdHash = sdHash; - this.location = location; - this.parameters = parameters; - } - public Partition getPartition() { - return p; - } - public byte[] getSdHash() { - return sdHash; - } - public String getLocation() { - return location; - } - public Map<String, String> getParameters() { - return parameters; - } - } + public CachedStore() { - static class StorageDescriptorWrapper { - StorageDescriptor sd; - int refCount = 0; - StorageDescriptorWrapper(StorageDescriptor sd, int refCount) { - this.sd = sd; - this.refCount = refCount; - } - public StorageDescriptor getSd() { - return sd; - } - public int getRefCount() { - return refCount; - } } - public CachedStore() { + @Override + public void setConf(Configuration conf) { + setConfInternal(conf); + initBlackListWhiteList(conf); + startCacheUpdateService(conf, false, true); } - public static void initSharedCacheAsync(Configuration conf) { - String clazzName = null; - boolean isEnabled = false; - try { - clazzName = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.RAW_STORE_IMPL); - isEnabled = JavaUtils.getClass(clazzName, RawStore.class).isAssignableFrom(CachedStore.class); - } catch (MetaException e) { - LOG.error("Cannot instantiate metastore class", e); - } - if (!isEnabled) { - LOG.debug("CachedStore is not enabled; using " + clazzName); - return; - } - sharedCacheWrapper.startInit(conf); + /** + * Similar to setConf but used from within the tests + * This does start the background thread for prewarm and update + * @param conf + */ + void setConfForTest(Configuration conf) { + setConfInternal(conf); + initBlackListWhiteList(conf); } - @Override - public void setConf(Configuration conf) { - String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, - ObjectStore.class.getName()); + private void setConfInternal(Configuration conf) { + String rawStoreClassName = + MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName()); if (rawStore == null) { try { rawStore = (JavaUtils.getClass(rawStoreClassName, RawStore.class)).newInstance(); @@ -260,94 +177,145 @@ public class CachedStore implements RawStore, Configurable { this.conf = conf; if (expressionProxy != null && conf != oldConf) { LOG.warn("Unexpected setConf when we were already configured"); - } - if (expressionProxy == null || conf != oldConf) { + } else { expressionProxy = PartFilterExprUtil.createExpressionProxy(conf); } - initBlackListWhiteList(conf); } @VisibleForTesting - static void prewarm(RawStore rawStore) throws Exception { - // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy - Deadline.registerIfNot(1000000); - List<String> dbNames = rawStore.getAllDatabases(); - LOG.info("Number of databases to prewarm: " + dbNames.size()); - SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); - for (int i = 0; i < dbNames.size(); i++) { - String dbName = StringUtils.normalizeIdentifier(dbNames.get(i)); - // Cache partition column stats - Deadline.startTimer("getColStatsForDatabase"); - List<ColStatsObjWithSourceInfo> colStatsForDB = - rawStore.getPartitionColStatsForDatabase(dbName); - Deadline.stopTimer(); - if (colStatsForDB != null) { - sharedCache.addPartitionColStatsToCache(colStatsForDB); + /** + * This initializes the caches in SharedCache by getting the objects from Metastore DB via + * ObjectStore and populating the respective caches + * + * @param rawStore + * @throws Exception + */ + static void prewarm(RawStore rawStore) { + if (isCachePrewarmed.get()) { + return; + } + long startTime = System.nanoTime(); + LOG.info("Prewarming CachedStore"); + while (!isCachePrewarmed.get()) { + // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy + Deadline.registerIfNot(1000000); + List<String> dbNames; + try { + dbNames = rawStore.getAllDatabases(); + } catch (MetaException e) { + // Try again + continue; } - LOG.info("Caching database: {}. Cached {} / {} databases so far.", dbName, i, dbNames.size()); - Database db = rawStore.getDatabase(dbName); - sharedCache.addDatabaseToCache(dbName, db); - List<String> tblNames = rawStore.getAllTables(dbName); - LOG.debug("Tables in database: {} : {}", dbName, tblNames); - for (int j = 0; j < tblNames.size(); j++) { - String tblName = StringUtils.normalizeIdentifier(tblNames.get(j)); - if (!shouldCacheTable(dbName, tblName)) { - LOG.info("Not caching database: {}'s table: {}", dbName, tblName); + LOG.info("Number of databases to prewarm: {}", dbNames.size()); + List<Database> databases = new ArrayList<>(dbNames.size()); + for (String dbName : dbNames) { + try { + databases.add(rawStore.getDatabase(dbName)); + } catch (NoSuchObjectException e) { + // Continue with next database continue; } - LOG.info("Caching database: {}'s table: {}. Cached {} / {} tables so far.", dbName, - tblName, j, tblNames.size()); - Table table = null; - table = rawStore.getTable(dbName, tblName); - // It is possible the table is deleted during fetching tables of the database, - // in that case, continue with the next table - if (table == null) { + } + sharedCache.populateDatabasesInCache(databases); + LOG.debug( + "Databases cache is now prewarmed. Now adding tables, partitions and statistics to the cache"); + int numberOfDatabasesCachedSoFar = 0; + for (String dbName : dbNames) { + dbName = StringUtils.normalizeIdentifier(dbName); + List<String> tblNames; + try { + tblNames = rawStore.getAllTables(dbName); + } catch (MetaException e) { + // Continue with next database continue; } - sharedCache.addTableToCache(dbName, tblName, table); - if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) { - Deadline.startTimer("getPartitions"); - List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); - Deadline.stopTimer(); - for (Partition partition : partitions) { - sharedCache.addPartitionToCache(dbName, tblName, partition); + int numberOfTablesCachedSoFar = 0; + for (String tblName : tblNames) { + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + continue; } - } - // Cache table column stats - List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); - Deadline.startTimer("getTableColumnStatistics"); - ColumnStatistics tableColStats = - rawStore.getTableColumnStatistics(dbName, tblName, colNames); - Deadline.stopTimer(); - if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) { - sharedCache.addTableColStatsToCache(dbName, tblName, tableColStats.getStatsObj()); - } - // Cache aggregate stats for all partitions of a table and for all but default partition - List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); - if ((partNames != null) && (partNames.size() > 0)) { - AggrStats aggrStatsAllPartitions = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); - // Remove default partition from partition names and get aggregate - // stats again - List<FieldSchema> partKeys = table.getPartitionKeys(); - String defaultPartitionValue = MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); - List<String> partCols = new ArrayList<String>(); - List<String> partVals = new ArrayList<String>(); - for (FieldSchema fs : partKeys) { - partCols.add(fs.getName()); - partVals.add(defaultPartitionValue); + Table table; + try { + table = rawStore.getTable(dbName, tblName); + } catch (MetaException e) { + // It is possible the table is deleted during fetching tables of the database, + // in that case, continue with the next table + continue; } - String defaultPartitionName = FileUtils.makePartName(partCols, partVals); - partNames.remove(defaultPartitionName); - AggrStats aggrStatsAllButDefaultPartition = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); - sharedCache.addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions, - aggrStatsAllButDefaultPartition); + List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); + try { + ColumnStatistics tableColStats = null; + List<Partition> partitions = null; + List<ColumnStatistics> partitionColStats = null; + AggrStats aggrStatsAllPartitions = null; + AggrStats aggrStatsAllButDefaultPartition = null; + if (table.isSetPartitionKeys()) { + Deadline.startTimer("getPartitions"); + partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); + Deadline.stopTimer(); + List<String> partNames = new ArrayList<>(partitions.size()); + for (Partition p : partitions) { + partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues())); + } + if (!partNames.isEmpty()) { + // Get partition column stats for this table + Deadline.startTimer("getPartitionColumnStatistics"); + partitionColStats = + rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + // Get aggregate stats for all partitions of a table and for all but default + // partition + Deadline.startTimer("getAggrPartitionColumnStatistics"); + aggrStatsAllPartitions = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + // Remove default partition from partition names and get aggregate + // stats again + List<FieldSchema> partKeys = table.getPartitionKeys(); + String defaultPartitionValue = + MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); + List<String> partCols = new ArrayList<>(); + List<String> partVals = new ArrayList<>(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + Deadline.startTimer("getAggrPartitionColumnStatistics"); + aggrStatsAllButDefaultPartition = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + } + } else { + Deadline.startTimer("getTableColumnStatistics"); + tableColStats = rawStore.getTableColumnStatistics(dbName, tblName, colNames); + Deadline.stopTimer(); + } + sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats, + aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); + } catch (MetaException | NoSuchObjectException e) { + // Continue with next table + continue; + } + LOG.debug("Processed database: {}'s table: {}. Cached {} / {} tables so far.", dbName, + tblName, ++numberOfTablesCachedSoFar, tblNames.size()); } + LOG.debug("Processed database: {}. Cached {} / {} databases so far.", dbName, + ++numberOfDatabasesCachedSoFar, dbNames.size()); } + isCachePrewarmed.set(true); } - // Notify all blocked threads that prewarm is complete now - sharedCacheWrapper.notifyAllBlocked(); + LOG.info("CachedStore initialized"); + long endTime = System.nanoTime(); + LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms"); + sharedCache.completeTableCachePrewarm(); + } + + @VisibleForTesting + static void setCachePrewarmedState(boolean state) { + isCachePrewarmed.set(state); } private static void initBlackListWhiteList(Configuration conf) { @@ -356,20 +324,27 @@ public class CachedStore implements RawStore, Configurable { MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST)); blacklistPatterns = createPatterns(MetastoreConf.getAsString(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST)); - // The last specified blacklist pattern gets precedence - Collections.reverse(blacklistPatterns); } } @VisibleForTesting - synchronized static void startCacheUpdateService(Configuration conf) { + /** + * This starts a background thread, which initially populates the SharedCache and later + * periodically gets updates from the metastore db + * + * @param conf + * @param runOnlyOnce + * @param shouldRunPrewarm + */ + static synchronized void startCacheUpdateService(Configuration conf, boolean runOnlyOnce, + boolean shouldRunPrewarm) { if (cacheUpdateMaster == null) { initBlackListWhiteList(conf); if (!MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST)) { - cacheRefreshPeriod = MetastoreConf.getTimeVar(conf, + cacheRefreshPeriodMS = MetastoreConf.getTimeVar(conf, ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS); } - LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriod); + LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriodMS); cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -379,13 +354,20 @@ public class CachedStore implements RawStore, Configurable { return t; } }); - cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf), 0, cacheRefreshPeriod, + if (!runOnlyOnce) { + cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, + cacheRefreshPeriodMS, TimeUnit.MILLISECONDS); + } + } + if (runOnlyOnce) { + // Some tests control the execution of the background update thread + cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, TimeUnit.MILLISECONDS); } } @VisibleForTesting - synchronized static boolean stopCacheUpdateService(long timeout) { + static synchronized boolean stopCacheUpdateService(long timeout) { boolean tasksStoppedBeforeShutdown = false; if (cacheUpdateMaster != null) { LOG.info("CachedStore: shutting down cache update service"); @@ -404,167 +386,83 @@ public class CachedStore implements RawStore, Configurable { @VisibleForTesting static void setCacheRefreshPeriod(long time) { - cacheRefreshPeriod = time; + cacheRefreshPeriodMS = time; } static class CacheUpdateMasterWork implements Runnable { - private boolean isFirstRun = true; + private boolean shouldRunPrewarm = true; private final RawStore rawStore; - public CacheUpdateMasterWork(Configuration conf) { - String rawStoreClassName = MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, - ObjectStore.class.getName()); + CacheUpdateMasterWork(Configuration conf, boolean shouldRunPrewarm) { + this.shouldRunPrewarm = shouldRunPrewarm; + String rawStoreClassName = + MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName()); try { rawStore = JavaUtils.getClass(rawStoreClassName, RawStore.class).newInstance(); rawStore.setConf(conf); } catch (InstantiationException | IllegalAccessException | MetaException e) { // MetaException here really means ClassNotFound (see the utility method). // So, if any of these happen, that means we can never succeed. - sharedCacheWrapper.updateInitState(e, true); throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e); } } @Override public void run() { - if (isFirstRun) { - while (isFirstRun) { - try { - long startTime = System.nanoTime(); - LOG.info("Prewarming CachedStore"); - prewarm(rawStore); - LOG.info("CachedStore initialized"); - long endTime = System.nanoTime(); - LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms"); - } catch (Exception e) { - LOG.error("Prewarm failure", e); - sharedCacheWrapper.updateInitState(e, false); - return; - } - sharedCacheWrapper.updateInitState(null, false); - isFirstRun = false; - } - } else { + if (!shouldRunPrewarm) { // TODO: prewarm and update can probably be merged. update(); + } else { + try { + prewarm(rawStore); + } catch (Exception e) { + LOG.error("Prewarm failure", e); + return; + } } } - public void update() { + void update() { Deadline.registerIfNot(1000000); LOG.debug("CachedStore: updating cached objects"); + List<String> dbNames; try { - List<String> dbNames = rawStore.getAllDatabases(); - if (dbNames != null) { - // Update the database in cache - updateDatabases(rawStore, dbNames); - for (String dbName : dbNames) { - updateDatabasePartitionColStats(rawStore, dbName); - // Update the tables in cache - updateTables(rawStore, dbName); - List<String> tblNames = getAllTablesInternal(dbName, sharedCacheWrapper.getUnsafe()); - for (String tblName : tblNames) { - if (!shouldCacheTable(dbName, tblName)) { - continue; - } - // Update the partitions for a table in cache - updateTablePartitions(rawStore, dbName, tblName); - // Update the table column stats for a table in cache - updateTableColStats(rawStore, dbName, tblName); - // Update aggregate column stats cache - updateAggregateStatsCache(rawStore, dbName, tblName); - } - } - } - } catch (Exception e) { - LOG.error("Updating CachedStore: error happen when refresh; ignoring", e); + dbNames = rawStore.getAllDatabases(); + } catch (MetaException e) { + LOG.error("Updating CachedStore: error happen when refresh; skipping this iteration", e); + return; } - } - - private void updateDatabasePartitionColStats(RawStore rawStore, String dbName) { - try { - Deadline.startTimer("getColStatsForDatabasePartitions"); - List<ColStatsObjWithSourceInfo> colStatsForDB = - rawStore.getPartitionColStatsForDatabase(dbName); - Deadline.stopTimer(); - if (colStatsForDB != null) { - if (partitionColStatsCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition column stats cache update; the partition column stats " - + "list we have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe() - .refreshPartitionColStats(StringUtils.normalizeIdentifier(dbName), colStatsForDB); - } - } - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read partitions column stats of database: {}", - dbName, e); - } finally { - if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) { - partitionColStatsCacheLock.writeLock().unlock(); + // Update the database in cache + updateDatabases(rawStore, dbNames); + for (String dbName : dbNames) { + // Update the tables in cache + updateTables(rawStore, dbName); + List<String> tblNames; + try { + tblNames = rawStore.getAllTables(dbName); + } catch (MetaException e) { + // Continue with next database + continue; } - } - } - - // Update cached aggregate stats for all partitions of a table and for all - // but default partition - private void updateAggregateStatsCache(RawStore rawStore, String dbName, String tblName) { - try { - Table table = rawStore.getTable(dbName, tblName); - List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); - List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); - if ((partNames != null) && (partNames.size() > 0)) { - Deadline.startTimer("getAggregareStatsForAllPartitions"); - AggrStats aggrStatsAllPartitions = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - // Remove default partition from partition names and get aggregate stats again - List<FieldSchema> partKeys = table.getPartitionKeys(); - String defaultPartitionValue = - MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); - List<String> partCols = new ArrayList<String>(); - List<String> partVals = new ArrayList<String>(); - for (FieldSchema fs : partKeys) { - partCols.add(fs.getName()); - partVals.add(defaultPartitionValue); - } - String defaultPartitionName = FileUtils.makePartName(partCols, partVals); - partNames.remove(defaultPartitionName); - Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault"); - AggrStats aggrStatsAllButDefaultPartition = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - if ((aggrStatsAllPartitions != null) && (aggrStatsAllButDefaultPartition != null)) { - if (partitionAggrColStatsCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isPartitionAggrColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug( - "Skipping aggregate column stats cache update; the aggregate column stats we " - + "have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshAggregateStatsCache( - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), - aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); - } + for (String tblName : tblNames) { + if (!shouldCacheTable(dbName, tblName)) { + continue; } - } - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, - e); - } finally { - if (partitionAggrColStatsCacheLock.isWriteLockedByCurrentThread()) { - partitionAggrColStatsCacheLock.writeLock().unlock(); + // Update the table column stats for a table in cache + updateTableColStats(rawStore, dbName, tblName); + // Update the partitions for a table in cache + updateTablePartitions(rawStore, dbName, tblName); + // Update the partition col stats for a table in cache + updateTablePartitionColStats(rawStore, dbName, tblName); + // Update aggregate partition column stats for a table in cache + updateTableAggregatePartitionColStats(rawStore, dbName, tblName); } } + sharedCache.incrementUpdateCount(); } private void updateDatabases(RawStore rawStore, List<String> dbNames) { - // Prepare the list of databases - List<Database> databases = new ArrayList<>(); + List<Database> databases = new ArrayList<>(dbNames.size()); for (String dbName : dbNames) { Database db; try { @@ -574,24 +472,9 @@ public class CachedStore implements RawStore, Configurable { LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e); } } - // Update the cached database objects - try { - if (databaseCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isDatabaseCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping database cache update; the database list we have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshDatabases(databases); - } - } finally { - if (databaseCacheLock.isWriteLockedByCurrentThread()) { - databaseCacheLock.writeLock().unlock(); - } - } + sharedCache.refreshDatabasesInCache(databases); } - // Update the cached table objects private void updateTables(RawStore rawStore, String dbName) { List<Table> tables = new ArrayList<>(); try { @@ -600,81 +483,99 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName)) { continue; } - Table table = - rawStore.getTable(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + Table table = rawStore.getTable(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName)); tables.add(table); } - if (tableCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isTableCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping table cache update; the table list we have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshTables(dbName, tables); - } + sharedCache.refreshTablesInCache(dbName, tables); } catch (MetaException e) { - LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e); - } finally { - if (tableCacheLock.isWriteLockedByCurrentThread()) { - tableCacheLock.writeLock().unlock(); + LOG.debug("Unable to refresh cached tables for database: " + dbName, e); + } + } + + private void updateTableColStats(RawStore rawStore, String dbName, String tblName) { + try { + Table table = rawStore.getTable(dbName, tblName); + if (!table.isSetPartitionKeys()) { + List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); + Deadline.startTimer("getTableColumnStatistics"); + ColumnStatistics tableColStats = + rawStore.getTableColumnStatistics(dbName, tblName, colNames); + Deadline.stopTimer(); + if (tableColStats != null) { + sharedCache.refreshTableColStatsInCache(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); + } } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Unable to refresh table column stats for table: " + tblName, e); } } - // Update the cached partition objects for a table private void updateTablePartitions(RawStore rawStore, String dbName, String tblName) { try { Deadline.startTimer("getPartitions"); List<Partition> partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); Deadline.stopTimer(); - if (partitionCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isPartitionCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition cache update; the partition list we have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshPartitions( - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partitions); - } + sharedCache.refreshPartitionsInCache(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), partitions); } catch (MetaException | NoSuchObjectException e) { LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); - } finally { - if (partitionCacheLock.isWriteLockedByCurrentThread()) { - partitionCacheLock.writeLock().unlock(); - } } } - // Update the cached col stats for this table - private void updateTableColStats(RawStore rawStore, String dbName, String tblName) { + private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) { try { Table table = rawStore.getTable(dbName, tblName); List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); - Deadline.startTimer("getTableColumnStatistics"); - ColumnStatistics tableColStats = - rawStore.getTableColumnStatistics(dbName, tblName, colNames); + List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); + // Get partition column stats for this table + Deadline.startTimer("getPartitionColumnStatistics"); + List<ColumnStatistics> partitionColStats = + rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); Deadline.stopTimer(); - if (tableColStats != null) { - if (tableColStatsCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isTableColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping table column stats cache update; the table column stats list we " - + "have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshTableColStats( - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); + sharedCache.refreshPartitionColStatsInCache(dbName, tblName, partitionColStats); + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); + } + } + + // Update cached aggregate stats for all partitions of a table and for all + // but default partition + private void updateTableAggregatePartitionColStats(RawStore rawStore, String dbName, + String tblName) { + try { + Table table = rawStore.getTable(dbName, tblName); + List<String> partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); + List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); + if ((partNames != null) && (partNames.size() > 0)) { + Deadline.startTimer("getAggregareStatsForAllPartitions"); + AggrStats aggrStatsAllPartitions = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + // Remove default partition from partition names and get aggregate stats again + List<FieldSchema> partKeys = table.getPartitionKeys(); + String defaultPartitionValue = + MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); + List<String> partCols = new ArrayList<String>(); + List<String> partVals = new ArrayList<String>(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault"); + AggrStats aggrStatsAllButDefaultPartition = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions, + aggrStatsAllButDefaultPartition); } } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read table column stats of table: " + tblName, e); - } finally { - if (tableColStatsCacheLock.isWriteLockedByCurrentThread()) { - tableColStatsCacheLock.writeLock().unlock(); - } + LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, + e); } } } @@ -712,35 +613,17 @@ public class CachedStore implements RawStore, Configurable { @Override public void createDatabase(Database db) throws InvalidObjectException, MetaException { rawStore.createDatabase(db); - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return; - } - try { - // Wait if background cache update is happening - databaseCacheLock.readLock().lock(); - isDatabaseCacheDirty.set(true); - sharedCache.addDatabaseToCache(StringUtils.normalizeIdentifier(db.getName()), - db.deepCopy()); - } finally { - databaseCacheLock.readLock().unlock(); - } + sharedCache.addDatabaseToCache(db); } @Override public Database getDatabase(String dbName) throws NoSuchObjectException { - SharedCache sharedCache; - - if (!sharedCacheWrapper.isInitialized()) { + if (!sharedCache.isDatabaseCachePrewarmed()) { return rawStore.getDatabase(dbName); } - - try { - sharedCache = sharedCacheWrapper.get(); - } catch (MetaException e) { - throw new RuntimeException(e); // TODO: why doesn't getDatabase throw MetaEx? - } - Database db = sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(dbName)); + dbName = dbName.toLowerCase(); + Database db = + sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(dbName)); if (db == null) { throw new NoSuchObjectException(); } @@ -748,68 +631,39 @@ public class CachedStore implements RawStore, Configurable { } @Override - public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaException { - boolean succ = rawStore.dropDatabase(dbname); + public boolean dropDatabase(String dbName) throws NoSuchObjectException, MetaException { + boolean succ = rawStore.dropDatabase(dbName); if (succ) { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return succ; - } - try { - // Wait if background cache update is happening - databaseCacheLock.readLock().lock(); - isDatabaseCacheDirty.set(true); - sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbname)); - } finally { - databaseCacheLock.readLock().unlock(); - } + dbName = dbName.toLowerCase(); + sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbName)); } return succ; } @Override - public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectException, - MetaException { + public boolean alterDatabase(String dbName, Database db) + throws NoSuchObjectException, MetaException { boolean succ = rawStore.alterDatabase(dbName, db); if (succ) { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return succ; - } - try { - // Wait if background cache update is happening - databaseCacheLock.readLock().lock(); - isDatabaseCacheDirty.set(true); - sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db); - } finally { - databaseCacheLock.readLock().unlock(); - } + dbName = dbName.toLowerCase(); + sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db); } return succ; } @Override public List<String> getDatabases(String pattern) throws MetaException { - if (!sharedCacheWrapper.isInitialized()) { + if (!sharedCache.isDatabaseCachePrewarmed()) { return rawStore.getDatabases(pattern); } - SharedCache sharedCache = sharedCacheWrapper.get(); - List<String> results = new ArrayList<>(); - for (String dbName : sharedCache.listCachedDatabases()) { - dbName = StringUtils.normalizeIdentifier(dbName); - if (CacheUtils.matches(dbName, pattern)) { - results.add(dbName); - } - } - return results; + return sharedCache.listCachedDatabases(pattern); } @Override public List<String> getAllDatabases() throws MetaException { - if (!sharedCacheWrapper.isInitialized()) { + if (!sharedCache.isDatabaseCachePrewarmed()) { return rawStore.getAllDatabases(); } - SharedCache sharedCache = sharedCacheWrapper.get(); return sharedCache.listCachedDatabases(); } @@ -854,24 +708,13 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName)) { return; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return; - } validateTableType(tbl); - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.addTableToCache(dbName, tblName, tbl); - } finally { - tableCacheLock.readLock().unlock(); - } + sharedCache.addTableToCache(dbName, tblName, tbl); } @Override - public boolean dropTable(String dbName, String tblName) throws MetaException, - NoSuchObjectException, InvalidObjectException, InvalidInputException { + public boolean dropTable(String dbName, String tblName) + throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropTable(dbName, tblName); if (succ) { dbName = StringUtils.normalizeIdentifier(dbName); @@ -879,28 +722,7 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return succ; - } - // Remove table - try { - // Wait if background table cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.removeTableFromCache(dbName, tblName); - } finally { - tableCacheLock.readLock().unlock(); - } - // Remove table col stats - try { - // Wait if background table col stats cache update is happening - tableColStatsCacheLock.readLock().lock(); - isTableColStatsCacheDirty.set(true); - sharedCache.removeTableColStatsFromCache(dbName, tblName); - } finally { - tableColStatsCacheLock.readLock().unlock(); - } + sharedCache.removeTableFromCache(dbName, tblName); } return succ; } @@ -909,11 +731,14 @@ public class CachedStore implements RawStore, Configurable { public Table getTable(String dbName, String tblName) throws MetaException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { return rawStore.getTable(dbName, tblName); } - SharedCache sharedCache = sharedCacheWrapper.get(); Table tbl = sharedCache.getTableFromCache(dbName, tblName); + if (tbl == null) { + // This table is not yet loaded in cache + return rawStore.getTable(dbName, tblName); + } if (tbl != null) { tbl.unsetPrivileges(); tbl.setRewriteEnabled(tbl.isRewriteEnabled()); @@ -930,27 +755,7 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return succ; - } - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.addPartitionToCache(dbName, tblName, part); - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); - } + sharedCache.addPartitionToCache(dbName, tblName, part); } return succ; } @@ -965,29 +770,7 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return succ; - } - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - for (Partition part : parts) { - sharedCache.addPartitionToCache(dbName, tblName, part); - } - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); - } + sharedCache.addPartitionsToCache(dbName, tblName, parts); } return succ; } @@ -1002,30 +785,10 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return succ; - } - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); - while (iterator.hasNext()) { - Partition part = iterator.next(); - sharedCache.addPartitionToCache(dbName, tblName, part); - } - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); + while (iterator.hasNext()) { + Partition part = iterator.next(); + sharedCache.addPartitionToCache(dbName, tblName, part); } } return succ; @@ -1036,16 +799,13 @@ public class CachedStore implements RawStore, Configurable { throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { return rawStore.getPartition(dbName, tblName, part_vals); } - SharedCache sharedCache = sharedCacheWrapper.get(); - Partition part = - sharedCache.getPartitionFromCache(dbName, tblName, part_vals); + Partition part = sharedCache.getPartitionFromCache(dbName, tblName, part_vals); if (part == null) { - // TODO Manage privileges - throw new NoSuchObjectException("partition values=" + part_vals.toString()); + // The table containing the partition is not yet loaded in cache + return rawStore.getPartition(dbName, tblName, part_vals); } return part; } @@ -1055,10 +815,14 @@ public class CachedStore implements RawStore, Configurable { List<String> part_vals) throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.doesPartitionExist(dbName, tblName, part_vals); + } + Table tbl = sharedCache.getTableFromCache(dbName, tblName); + if (tbl == null) { + // The table containing the partition is not yet loaded in cache return rawStore.doesPartitionExist(dbName, tblName, part_vals); } - SharedCache sharedCache = sharedCacheWrapper.get(); return sharedCache.existPartitionFromCache(dbName, tblName, part_vals); } @@ -1072,50 +836,40 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return succ; - } - // Remove partition - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.removePartitionFromCache(dbName, tblName, part_vals); - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove partition col stats - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); - } + sharedCache.removePartitionFromCache(dbName, tblName, part_vals); } return succ; } @Override + public void dropPartitions(String dbName, String tblName, List<String> partNames) + throws MetaException, NoSuchObjectException { + rawStore.dropPartitions(dbName, tblName, partNames); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + return; + } + List<List<String>> partVals = new ArrayList<List<String>>(); + for (String partName : partNames) { + partVals.add(partNameToVals(partName)); + } + sharedCache.removePartitionsFromCache(dbName, tblName, partVals); + } + + @Override public List<Partition> getPartitions(String dbName, String tblName, int max) throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.getPartitions(dbName, tblName, max); + } + Table tbl = sharedCache.getTableFromCache(dbName, tblName); + if (tbl == null) { + // The table containing the partitions is not yet loaded in cache return rawStore.getPartitions(dbName, tblName, max); } - SharedCache sharedCache = sharedCacheWrapper.get(); List<Partition> parts = sharedCache.listCachedPartitions(dbName, tblName, max); return parts; } @@ -1130,73 +884,20 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) { return; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + Table tbl = sharedCache.getTableFromCache(dbName, tblName); + if (tbl == null) { + // The table is not yet loaded in cache return; } - - if (shouldCacheTable(dbName, newTblName)) { - validateTableType(newTable); - // Update table cache - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), newTable); - } finally { - tableCacheLock.readLock().unlock(); - } - // Update partition cache (key might have changed since table name is a - // component of key) - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.alterTableInPartitionCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), newTable); - } finally { - partitionCacheLock.readLock().unlock(); - } - } else { - // Remove the table and its cached partitions, stats etc, - // since it does not pass the whitelist/blacklist filter. - // Remove table - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.removeTableFromCache(dbName, tblName); - } finally { - tableCacheLock.readLock().unlock(); - } - // Remove partitions - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.removePartitionsFromCache(dbName, tblName); - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove partition col stats - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(dbName, tblName); - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Update aggregate partition col stats keys wherever applicable - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.alterTableInAggrPartitionColStatsCache(dbName, tblName, newTable); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); - } + if (shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) { + // If old table is in the cache and the new table can also be cached + sharedCache.alterTableInCache(dbName, tblName, newTable); + } else if (!shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) { + // If old table is *not* in the cache but the new table can be cached + sharedCache.addTableToCache(dbName, newTblName, newTable); + } else if (shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) { + // If old table is in the cache but the new table *cannot* be cached + sharedCache.removeTableFromCache(dbName, tblName); } } @@ -1208,34 +909,21 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getTables(String dbName, String pattern) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { return rawStore.getTables(dbName, pattern); } - SharedCache sharedCache = sharedCacheWrapper.get(); - List<String> tableNames = new ArrayList<>(); - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), pattern)) { - tableNames.add(table.getTableName()); - } - } - return tableNames; + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern, + (short) -1); } @Override public List<String> getTables(String dbName, String pattern, TableType tableType) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { return rawStore.getTables(dbName, pattern, tableType); } - SharedCache sharedCache = sharedCacheWrapper.get(); - List<String> tableNames = new ArrayList<>(); - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), pattern) && - table.getTableType().equals(tableType.toString())) { - tableNames.add(table.getTableName()); - } - } - return tableNames; + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern, + tableType); } @Override @@ -1248,10 +936,9 @@ public class CachedStore implements RawStore, Configurable { public List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes) throws MetaException { // TODO Check if all required tables are allowed, if so, get it from cache - if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { return rawStore.getTableMeta(dbNames, tableNames, tableTypes); } - SharedCache sharedCache = sharedCacheWrapper.get(); return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(dbNames), StringUtils.normalizeIdentifier(tableNames), tableTypes); } @@ -1268,10 +955,9 @@ public class CachedStore implements RawStore, Configurable { break; } } - if (!sharedCacheWrapper.isInitialized() || missSomeInCache) { + if (!isCachePrewarmed.get() || missSomeInCache) { return rawStore.getTableObjectsByName(dbName, tblNames); } - SharedCache sharedCache = sharedCacheWrapper.get(); List<Table> tables = new ArrayList<>(); for (String tblName : tblNames) { tblName = StringUtils.normalizeIdentifier(tblName); @@ -1286,38 +972,20 @@ public class CachedStore implements RawStore, Configurable { @Override public List<String> getAllTables(String dbName) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { return rawStore.getAllTables(dbName); } - SharedCache sharedCache = sharedCacheWrapper.get(); - return getAllTablesInternal(dbName, sharedCache); - } - - private static List<String> getAllTablesInternal(String dbName, SharedCache sharedCache) { - List<String> tblNames = new ArrayList<>(); - for (Table tbl : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - tblNames.add(StringUtils.normalizeIdentifier(tbl.getTableName())); - } - return tblNames; + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName)); } @Override public List<String> listTableNamesByFilter(String dbName, String filter, short max_tables) throws MetaException, UnknownDBException { - if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { return rawStore.listTableNamesByFilter(dbName, filter, max_tables); } - SharedCache sharedCache = sharedCacheWrapper.get(); - List<String> tableNames = new ArrayList<>(); - int count = 0; - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), filter) - && (max_tables == -1 || count < max_tables)) { - tableNames.add(table.getTableName()); - count++; - } - } - return tableNames; + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), filter, + max_tables); } @Override @@ -1325,16 +993,19 @@ public class CachedStore implements RawStore, Configurable { short max_parts) throws MetaException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.listPartitionNames(dbName, tblName, max_parts); + } + Table tbl = sharedCache.getTableFromCache(dbName, tblName); + if (tbl == null) { + // The table is not yet loaded in cache return rawStore.listPartitionNames(dbName, tblName, max_parts); } - SharedCache sharedCache = sharedCacheWrapper.get(); List<String> partitionNames = new ArrayList<>(); - Table t = sharedCache.getTableFromCache(dbName, tblName); int count = 0; for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, max_parts)) { if (max_parts == -1 || count < max_parts) { - partitionNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues())); + partitionNames.add(Warehouse.makePartName(tbl.getPartitionKeys(), part.getValues())); } } return partitionNames; @@ -1363,37 +1034,7 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName)) { return; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return; - } - // Update partition cache - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); - } finally { - partitionCacheLock.readLock().unlock(); - } - // Update partition column stats cache - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); - } + sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); } @Override @@ -1405,61 +1046,23 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName)) { return; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return; - } - // Update partition cache - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - for (int i = 0; i < partValsList.size(); i++) { - List<String> partVals = partValsList.get(i); - Partition newPart = newParts.get(i); - sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); - } - } finally { - partitionCacheLock.readLock().unlock(); - } - // Update partition column stats cache - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - for (int i = 0; i < partValsList.size(); i++) { - List<String> partVals = partValsList.get(i); - Partition newPart = newParts.get(i); - sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); - } - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); - } + sharedCache.alterPartitionsInCache(dbName, tblName, partValsList, newParts); } private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr, String defaultPartName, short maxParts, List<String> result, SharedCache sharedCache) - throws MetaException, NoSuchObjectException { - List<Partition> parts = sharedCache.listCachedPartitions( - StringUtils.normalizeIdentifier(table.getDbName()), - StringUtils.normalizeIdentifier(table.getTableName()), maxParts); + throws MetaException, NoSuchObjectException { + List<Partition> parts = + sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(table.getDbName()), + StringUtils.normalizeIdentifier(table.getTableName()), maxParts); for (Partition part : parts) { result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues())); } if (defaultPartName == null || defaultPartName.isEmpty()) { defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); } - return expressionProxy.filterPartitionsByExpr( - table.getPartitionKeys(), expr, defaultPartName, result); + return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName, + result); } @Override @@ -1474,13 +1077,17 @@ public class CachedStore implements RawStore, Configurable { String defaultPartitionName, short maxParts, List<Partition> result) throws TException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts, result); } - SharedCache sharedCache = sharedCacheWrapper.get(); List<String> partNames = new LinkedList<>(); Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts, + result); + } boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartitionName, maxParts, partNames, sharedCache); return hasUnknownPartitions; @@ -1497,13 +1104,16 @@ public class CachedStore implements RawStore, Configurable { throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { return rawStore.getNumPartitionsByExpr(dbName, tblName, expr); } - SharedCache sharedCache = sharedCacheWrapper.get(); String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); List<String> partNames = new LinkedList<>(); Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.getNumPartitionsByExpr(dbName, tblName, expr); + } getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames, sharedCache); return partNames.size(); @@ -1526,10 +1136,14 @@ public class CachedStore implements RawStore, Configurable { List<String> partNames) throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.getPartitionsByNames(dbName, tblName, partNames); + } + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache return rawStore.getPartitionsByNames(dbName, tblName, partNames); } - SharedCache sharedCache = sharedCacheWrapper.get(); List<Partition> partitions = new ArrayList<>(); for (String partName : partNames) { Partition part = sharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName)); @@ -1702,14 +1316,17 @@ public class CachedStore implements RawStore, Configurable { throws MetaException, NoSuchObjectException, InvalidObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames); + } + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames); } - SharedCache sharedCache = sharedCacheWrapper.get(); Partition p = sharedCache.getPartitionFromCache(dbName, tblName, partVals); - if (p!=null) { - Table t = sharedCache.getTableFromCache(dbName, tblName); - String partName = Warehouse.makePartName(t.getPartitionKeys(), partVals); + if (p != null) { + String partName = Warehouse.makePartName(table.getPartitionKeys(), partVals); PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, userName, groupNames); p.setPrivileges(privs); @@ -1723,16 +1340,19 @@ public class CachedStore implements RawStore, Configurable { throws MetaException, NoSuchObjectException, InvalidObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames); + } + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames); } - SharedCache sharedCache = sharedCacheWrapper.get(); - Table t = sharedCache.getTableFromCache(dbName, tblName); List<Partition> partitions = new ArrayList<>(); int count = 0; for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { if (maxParts == -1 || count < maxParts) { - String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues()); + String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues()); PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, userName, groupNames); part.setPrivileges(privs); @@ -1749,13 +1369,16 @@ public class CachedStore implements RawStore, Configurable { throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts); + } + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts); } - SharedCache sharedCache = sharedCacheWrapper.get(); List<String> partNames = new ArrayList<>(); int count = 0; - Table t = sharedCache.getTableFromCache(dbName, tblName); for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { boolean psMatch = true; for (int i=0;i<partVals.size();i++) { @@ -1770,7 +1393,7 @@ public class CachedStore implements RawStore, Configurable { continue; } if (maxParts == -1 || count < maxParts) { - partNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues())); + partNames.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues())); count++; } } @@ -1783,13 +1406,17 @@ public class CachedStore implements RawStore, Configurable { throws MetaException, InvalidObjectException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, userName, + groupNames); + } + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache return rawStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, userName, groupNames); } - SharedCache sharedCache = sharedCacheWrapper.get(); List<Partition> partitions = new ArrayList<>(); - Table t = sharedCache.getTableFromCache(dbName, tblName); int count = 0; for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { boolean psMatch = true; @@ -1805,7 +1432,7 @@ public class CachedStore implements RawStore, Configurable { continue; } if (maxParts == -1 || count < maxParts) { - String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues()); + String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues()); PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, userName, groupNames); part.setPrivileges(privs); @@ -1825,35 +1452,19 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache return succ; } List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj(); - Table tbl = getTable(dbName, tblName); List<String> colNames = new ArrayList<>(); for (ColumnStatisticsObj statsObj : statsObjs) { colNames.add(statsObj.getColName()); } - StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames); - // Update table - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.alterTableInCache(dbName, tblName, tbl); - } finally { - tableCacheLock.readLock().unlock(); - } - // Update table col stats - try { - // Wait if background cache update is happening - tableColStatsCacheLock.readLock().lock(); - isTableColStatsCacheDirty.set(true); - sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs); - } finally { - tableColStatsCacheLock.readLock().unlock(); - } + StatsSetupConst.setColumnStatsState(table.getParameters(), colNames); + sharedCache.alterTableInCache(dbName, tblName, table); + sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs); } return succ; } @@ -1863,24 +1474,18 @@ public class CachedStore implements RawStore, Configurable { List<String> colNames) throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { return rawStore.getTableColumnStatistics(dbName, tblName, colNames); } - SharedCache sharedCache = sharedCacheWrapper.get(); - ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName); - List<ColumnStatisticsObj> colStatObjs = new ArrayList<>(); - for (String colName : colNames) { - String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, colName); - ColumnStatisticsObj colStat = sharedCache.getCachedTableColStats(colStatsCacheKey); - if (colStat != null) { - colStatObjs.add(colStat); - } - } - if (colStatObjs.isEmpty()) { - return null; - } else { - return new ColumnStatistics(csd, colStatObjs); + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.getTableColumnStatistics(dbName, tblName, colNames); } + ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName); + List<ColumnStatisticsObj> colStatObjs = + sharedCache.getTableColStatsFromCache(dbName, tblName, colNames); + return new ColumnStatistics(csd, colStatObjs); } @Override @@ -1893,18 +1498,7 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return succ; - } - try { - // Wait if background cache update is happening - tableColStatsCacheLock.readLock().lock(); - isTableColStatsCacheDirty.set(true); - sharedCache.removeTableColStatsFromCache(dbName, tblName, colName); - } finally { - tableColStatsCacheLock.readLock().unlock(); - } + sharedCache.removeTableColStatsFromCache(dbName, tblName, colName); } return succ; } @@ -1919,10 +1513,6 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return succ; - } List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj(); Partition part = getPartition(dbName, tblName, partVals); List<String> colNames = new ArrayList<>(); @@ -1930,34 +1520,8 @@ public class CachedStore implements RawStore, Configurable { colNames.add(statsObj.getColName()); } StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); - // Update partition - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.alterPartitionInCache(dbName, tblName, partVals, part); - } finally { - partitionCacheLock.readLock().unlock(); - } - // Update partition column stats - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - sharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals, - colStats.getStatsObj()); - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); - } + sharedCache.alterPartitionInCache(dbName, tblName, partVals, part); + sharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals, colStats.getStatsObj()); } return succ; } @@ -1981,27 +1545,7 @@ public class CachedStore implements RawStore, Configurable { if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return succ; - } - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName); - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); - } + sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName); } return succ; } @@ -2012,10 +1556,14 @@ public class CachedStore implements RawStore, Configurable { List<ColumnStatisticsObj> colStats; dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); } - SharedCache sharedCache = sharedCacheWrapper.get(); + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + } List<String> allPartNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); if (partNames.size() == allPartNames.size()) { colStats = sharedCache.getAggrStatsFromCache(dbName, tblName, colNames, StatsType.ALL); @@ -2054,10 +1602,8 @@ public class CachedStore implements RawStore, Configurable { List<ColStatsObjWithSourceInfo> colStatsWithPartInfoList = new ArrayList<ColStatsObjWithSourceInfo>(); for (String partName : partNames) { - String colStatsCacheKey = - CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); ColumnStatisticsObj colStatsForPart = - sharedCache.getCachedPartitionColStats(colStatsCacheKey); + sharedCache.getPartitionColStatsFromCache(dbName, tblName, partNameToVals(partName), colName); if (colStatsForPart != null) { ColStatsObjWithSourceInfo colStatsWithPartInfo = new ColStatsObjWithSourceInfo(colStatsForPart, dbName, tblName, partName); @@ -2173,54 +1719,6 @@ public class CachedStore implements RawStore, Configurable { } @Override - public void dropPartitions(String dbName, String tblName, List<String> partNames) - throws MetaException, NoSuchObjectException { - rawStore.dropPartitions(dbName, tblName, partNames); - dbName = StringUtils.normalizeIdentifier(dbName); - tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(dbName, tblName)) { - return; - } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return; - } - // Remove partitions - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - for (String partName : partNames) { - List<String> vals = partNameToVals(partName); - sharedCache.removePartitionFromCache(dbName, tblName, vals); - } - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove partition col stats - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - for (String partName : partNames) { - List<String> part_vals = partNameToVals(partName); - sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); - } - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } -
<TRUNCATED>