http://git-wip-us.apache.org/repos/asf/hive/blob/651e7950/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java ---------------------------------------------------------------------- diff --cc standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 0000000,8ff056f..9bee0db mode 000000,100644..100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@@ -1,0 -1,2532 +1,2532 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.metastore.cache; + + + import java.nio.ByteBuffer; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collection; + import java.util.EmptyStackException; + import java.util.HashMap; + import java.util.LinkedList; + import java.util.List; + import java.util.Map; + import java.util.Stack; + import java.util.concurrent.Executors; + import java.util.concurrent.ScheduledExecutorService; + import java.util.concurrent.ThreadFactory; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicBoolean; + import java.util.regex.Matcher; + import java.util.regex.Pattern; + + import org.apache.hadoop.conf.Configurable; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.hive.common.DatabaseName; + import org.apache.hadoop.hive.common.StatsSetupConst; + import org.apache.hadoop.hive.common.TableName; + import org.apache.hadoop.hive.metastore.Deadline; + import org.apache.hadoop.hive.metastore.FileMetadataHandler; + import org.apache.hadoop.hive.metastore.ObjectStore; + import org.apache.hadoop.hive.metastore.PartFilterExprUtil; + import org.apache.hadoop.hive.metastore.PartitionExpressionProxy; + import org.apache.hadoop.hive.metastore.RawStore; + import org.apache.hadoop.hive.metastore.TableType; + import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.AggrStats; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.Catalog; -import org.apache.hadoop.hive.metastore.api.ColumnStatistics; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; -import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; -import org.apache.hadoop.hive.metastore.api.CreationMetadata; -import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.FileMetadataExprType; -import org.apache.hadoop.hive.metastore.api.Function; -import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; -import org.apache.hadoop.hive.metastore.api.HiveObjectRef; -import org.apache.hadoop.hive.metastore.api.ISchema; -import org.apache.hadoop.hive.metastore.api.ISchemaName; -import org.apache.hadoop.hive.metastore.api.InvalidInputException; -import org.apache.hadoop.hive.metastore.api.InvalidObjectException; -import org.apache.hadoop.hive.metastore.api.InvalidOperationException; -import org.apache.hadoop.hive.metastore.api.InvalidPartitionException; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; -import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; -import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.PartitionEventType; -import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse; -import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; -import org.apache.hadoop.hive.metastore.api.PrincipalType; -import org.apache.hadoop.hive.metastore.api.PrivilegeBag; -import org.apache.hadoop.hive.metastore.api.WMNullablePool; -import org.apache.hadoop.hive.metastore.api.WMNullableResourcePlan; -import org.apache.hadoop.hive.metastore.api.WMResourcePlan; -import org.apache.hadoop.hive.metastore.api.WMTrigger; -import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse; ++import org.apache.hadoop.hive.metastore.api.*; + import org.apache.hadoop.hive.metastore.cache.SharedCache.StatsType; + import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator; + import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregatorFactory; -import org.apache.hadoop.hive.metastore.api.Role; -import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant; -import org.apache.hadoop.hive.metastore.api.RuntimeStat; -import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint; -import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint; -import org.apache.hadoop.hive.metastore.api.SQLForeignKey; -import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; -import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; -import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; -import org.apache.hadoop.hive.metastore.api.SchemaVersion; -import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.TableMeta; -import org.apache.hadoop.hive.metastore.api.Type; -import org.apache.hadoop.hive.metastore.api.UnknownDBException; -import org.apache.hadoop.hive.metastore.api.UnknownPartitionException; -import org.apache.hadoop.hive.metastore.api.UnknownTableException; -import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan; -import org.apache.hadoop.hive.metastore.api.WMMapping; -import org.apache.hadoop.hive.metastore.api.WMPool; -import org.apache.hadoop.hive.metastore.api.WriteEventInfo; + import org.apache.hadoop.hive.metastore.conf.MetastoreConf; + import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; + import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; + import org.apache.hadoop.hive.metastore.utils.FileUtils; + import org.apache.hadoop.hive.metastore.utils.JavaUtils; + import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; + import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; + import org.apache.hadoop.hive.metastore.utils.StringUtils; + import org.apache.thrift.TException; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import com.google.common.annotations.VisibleForTesting; + + import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; + import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog; + import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; + + // TODO filter->expr + // TODO functionCache + // TODO constraintCache + // TODO need sd nested copy? + // TODO String intern + // TODO monitor event queue + // TODO initial load slow? + // TODO size estimation + + public class CachedStore implements RawStore, Configurable { + private static ScheduledExecutorService cacheUpdateMaster = null; + private static List<Pattern> whitelistPatterns = null; + private static List<Pattern> blacklistPatterns = null; + // Default value set to 100 milliseconds for test purpose + private static long DEFAULT_CACHE_REFRESH_PERIOD = 100; + // Time after which metastore cache is updated from metastore DB by the background update thread + private static long cacheRefreshPeriodMS = DEFAULT_CACHE_REFRESH_PERIOD; + private static AtomicBoolean isCachePrewarmed = new AtomicBoolean(false); + private static TablesPendingPrewarm tblsPendingPrewarm = new TablesPendingPrewarm(); + private RawStore rawStore = null; + private Configuration conf; + private PartitionExpressionProxy expressionProxy = null; + private static final SharedCache sharedCache = new SharedCache(); + + static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName()); + + @Override + public void setConf(Configuration conf) { + setConfInternal(conf); + initBlackListWhiteList(conf); + initSharedCache(conf); + startCacheUpdateService(conf, false, true); + } + + /** + * Similar to setConf but used from within the tests + * This does start the background thread for prewarm and update + * @param conf + */ + void setConfForTest(Configuration conf) { + setConfInternal(conf); + initBlackListWhiteList(conf); + initSharedCache(conf); + } + + private void setConfInternal(Configuration conf) { + String rawStoreClassName = + MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName()); + if (rawStore == null) { + try { + rawStore = (JavaUtils.getClass(rawStoreClassName, RawStore.class)).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e); + } + } + rawStore.setConf(conf); + Configuration oldConf = this.conf; + this.conf = conf; + if (expressionProxy != null && conf != oldConf) { + LOG.warn("Unexpected setConf when we were already configured"); + } else { + expressionProxy = PartFilterExprUtil.createExpressionProxy(conf); + } + } + + private void initSharedCache(Configuration conf) { + long maxSharedCacheSizeInBytes = + MetastoreConf.getSizeVar(conf, ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY); + sharedCache.initialize(maxSharedCacheSizeInBytes); + if (maxSharedCacheSizeInBytes > 0) { + LOG.info("Maximum memory that the cache will use: {} GB", + maxSharedCacheSizeInBytes / (1024 * 1024 * 1024)); + } + } + + @VisibleForTesting + /** + * This initializes the caches in SharedCache by getting the objects from Metastore DB via + * ObjectStore and populating the respective caches + */ + static void prewarm(RawStore rawStore) { + if (isCachePrewarmed.get()) { + return; + } + long startTime = System.nanoTime(); + LOG.info("Prewarming CachedStore"); + while (!isCachePrewarmed.get()) { + // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy + Deadline.registerIfNot(1000000); + Collection<String> catalogsToCache; + try { + catalogsToCache = catalogsToCache(rawStore); + LOG.info("Going to cache catalogs: " + + org.apache.commons.lang.StringUtils.join(catalogsToCache, ", ")); + List<Catalog> catalogs = new ArrayList<>(catalogsToCache.size()); + for (String catName : catalogsToCache) { + catalogs.add(rawStore.getCatalog(catName)); + } + sharedCache.populateCatalogsInCache(catalogs); + } catch (MetaException | NoSuchObjectException e) { + LOG.warn("Failed to populate catalogs in cache, going to try again", e); + // try again + continue; + } + LOG.info("Finished prewarming catalogs, starting on databases"); + List<Database> databases = new ArrayList<>(); + for (String catName : catalogsToCache) { + try { + List<String> dbNames = rawStore.getAllDatabases(catName); + LOG.info("Number of databases to prewarm in catalog {}: {}", catName, dbNames.size()); + for (String dbName : dbNames) { + try { + databases.add(rawStore.getDatabase(catName, dbName)); + } catch (NoSuchObjectException e) { + // Continue with next database + LOG.warn("Failed to cache database " + + DatabaseName.getQualified(catName, dbName) + ", moving on", e); + } + } + } catch (MetaException e) { + LOG.warn("Failed to cache databases in catalog " + catName + ", moving on", e); + } + } + sharedCache.populateDatabasesInCache(databases); + LOG.info( + "Databases cache is now prewarmed. Now adding tables, partitions and statistics to the cache"); + int numberOfDatabasesCachedSoFar = 0; + for (Database db : databases) { + String catName = StringUtils.normalizeIdentifier(db.getCatalogName()); + String dbName = StringUtils.normalizeIdentifier(db.getName()); + List<String> tblNames; + try { + tblNames = rawStore.getAllTables(catName, dbName); + } catch (MetaException e) { + LOG.warn("Failed to cache tables for database " + + DatabaseName.getQualified(catName, dbName) + ", moving on"); + // Continue with next database + continue; + } + tblsPendingPrewarm.addTableNamesForPrewarming(tblNames); + int totalTablesToCache = tblNames.size(); + int numberOfTablesCachedSoFar = 0; + while (tblsPendingPrewarm.hasMoreTablesToPrewarm()) { + try { + String tblName = + StringUtils.normalizeIdentifier(tblsPendingPrewarm.getNextTableNameToPrewarm()); + if (!shouldCacheTable(catName, dbName, tblName)) { + continue; + } + Table table; + try { + table = rawStore.getTable(catName, dbName, tblName); + } catch (MetaException e) { + // It is possible the table is deleted during fetching tables of the database, + // in that case, continue with the next table + continue; + } + List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); + try { + ColumnStatistics tableColStats = null; + List<Partition> partitions = null; + List<ColumnStatistics> partitionColStats = null; + AggrStats aggrStatsAllPartitions = null; + AggrStats aggrStatsAllButDefaultPartition = null; + if (table.isSetPartitionKeys()) { + Deadline.startTimer("getPartitions"); + partitions = rawStore.getPartitions(catName, dbName, tblName, Integer.MAX_VALUE); + Deadline.stopTimer(); + List<String> partNames = new ArrayList<>(partitions.size()); + for (Partition p : partitions) { + partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues())); + } + if (!partNames.isEmpty()) { + // Get partition column stats for this table + Deadline.startTimer("getPartitionColumnStatistics"); + partitionColStats = rawStore.getPartitionColumnStatistics(catName, dbName, + tblName, partNames, colNames); + Deadline.stopTimer(); + // Get aggregate stats for all partitions of a table and for all but default + // partition + Deadline.startTimer("getAggrPartitionColumnStatistics"); + aggrStatsAllPartitions = + rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + // Remove default partition from partition names and get aggregate + // stats again + List<FieldSchema> partKeys = table.getPartitionKeys(); + String defaultPartitionValue = + MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); + List<String> partCols = new ArrayList<>(); + List<String> partVals = new ArrayList<>(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + Deadline.startTimer("getAggrPartitionColumnStatistics"); + aggrStatsAllButDefaultPartition = + rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + } + } else { + Deadline.startTimer("getTableColumnStatistics"); + tableColStats = + rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); + Deadline.stopTimer(); + } ++ // TODO## should this take write ID into account? or at least cache write ID to verify? + // If the table could not cached due to memory limit, stop prewarm + boolean isSuccess = sharedCache.populateTableInCache(table, tableColStats, partitions, + partitionColStats, aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); + if (isSuccess) { + LOG.trace("Cached Database: {}'s Table: {}.", dbName, tblName); + } else { + LOG.info( + "Unable to cache Database: {}'s Table: {}, since the cache memory is full. " + + "Will stop attempting to cache any more tables.", + dbName, tblName); + completePrewarm(startTime); + return; + } + } catch (MetaException | NoSuchObjectException e) { + // Continue with next table + continue; + } + LOG.debug("Processed database: {}'s table: {}. Cached {} / {} tables so far.", dbName, + tblName, ++numberOfTablesCachedSoFar, totalTablesToCache); + } catch (EmptyStackException e) { + // We've prewarmed this database, continue with the next one + continue; + } + } + LOG.debug("Processed database: {}. Cached {} / {} databases so far.", dbName, + ++numberOfDatabasesCachedSoFar, databases.size()); + } + completePrewarm(startTime); + } + } + + private static void completePrewarm(long startTime) { + isCachePrewarmed.set(true); + LOG.info("CachedStore initialized"); + long endTime = System.nanoTime(); + LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms"); + sharedCache.completeTableCachePrewarm(); + } + + static class TablesPendingPrewarm { + private Stack<String> tableNames = new Stack<>(); + + private synchronized void addTableNamesForPrewarming(List<String> tblNames) { + tableNames.clear(); + if (tblNames != null) { + tableNames.addAll(tblNames); + } + } + + private synchronized boolean hasMoreTablesToPrewarm() { + return !tableNames.empty(); + } + + private synchronized String getNextTableNameToPrewarm() { + return tableNames.pop(); + } + + private synchronized void prioritizeTableForPrewarm(String tblName) { + // If the table is in the pending prewarm list, move it to the top + if (tableNames.remove(tblName)) { + tableNames.push(tblName); + } + } + } + + @VisibleForTesting + static void setCachePrewarmedState(boolean state) { + isCachePrewarmed.set(state); + } + + private static void initBlackListWhiteList(Configuration conf) { + if (whitelistPatterns == null || blacklistPatterns == null) { + whitelistPatterns = createPatterns(MetastoreConf.getAsString(conf, + MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST)); + blacklistPatterns = createPatterns(MetastoreConf.getAsString(conf, + MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST)); + } + } + + private static Collection<String> catalogsToCache(RawStore rs) throws MetaException { + Collection<String> confValue = + MetastoreConf.getStringCollection(rs.getConf(), ConfVars.CATALOGS_TO_CACHE); + if (confValue == null || confValue.isEmpty() || + (confValue.size() == 1 && confValue.contains(""))) { + return rs.getCatalogs(); + } else { + return confValue; + } + } + + @VisibleForTesting + /** + * This starts a background thread, which initially populates the SharedCache and later + * periodically gets updates from the metastore db + * + * @param conf + * @param runOnlyOnce + * @param shouldRunPrewarm + */ + static synchronized void startCacheUpdateService(Configuration conf, boolean runOnlyOnce, + boolean shouldRunPrewarm) { + if (cacheUpdateMaster == null) { + initBlackListWhiteList(conf); + if (!MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST)) { + cacheRefreshPeriodMS = MetastoreConf.getTimeVar(conf, + ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, TimeUnit.MILLISECONDS); + } + LOG.info("CachedStore: starting cache update service (run every {} ms", cacheRefreshPeriodMS); + cacheUpdateMaster = Executors.newScheduledThreadPool(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("CachedStore-CacheUpdateService: Thread-" + t.getId()); + t.setDaemon(true); + return t; + } + }); + if (!runOnlyOnce) { + cacheUpdateMaster.scheduleAtFixedRate(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, + cacheRefreshPeriodMS, TimeUnit.MILLISECONDS); + } + } + if (runOnlyOnce) { + // Some tests control the execution of the background update thread + cacheUpdateMaster.schedule(new CacheUpdateMasterWork(conf, shouldRunPrewarm), 0, + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + static synchronized boolean stopCacheUpdateService(long timeout) { + boolean tasksStoppedBeforeShutdown = false; + if (cacheUpdateMaster != null) { + LOG.info("CachedStore: shutting down cache update service"); + try { + tasksStoppedBeforeShutdown = + cacheUpdateMaster.awaitTermination(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.info("CachedStore: cache update service was interrupted while waiting for tasks to " + + "complete before shutting down. Will make a hard stop now."); + } + cacheUpdateMaster.shutdownNow(); + cacheUpdateMaster = null; + } + return tasksStoppedBeforeShutdown; + } + + @VisibleForTesting + static void setCacheRefreshPeriod(long time) { + cacheRefreshPeriodMS = time; + } + + static class CacheUpdateMasterWork implements Runnable { + private boolean shouldRunPrewarm = true; + private final RawStore rawStore; + + CacheUpdateMasterWork(Configuration conf, boolean shouldRunPrewarm) { + this.shouldRunPrewarm = shouldRunPrewarm; + String rawStoreClassName = + MetastoreConf.getVar(conf, ConfVars.CACHED_RAW_STORE_IMPL, ObjectStore.class.getName()); + try { + rawStore = JavaUtils.getClass(rawStoreClassName, RawStore.class).newInstance(); + rawStore.setConf(conf); + } catch (InstantiationException | IllegalAccessException | MetaException e) { + // MetaException here really means ClassNotFound (see the utility method). + // So, if any of these happen, that means we can never succeed. + throw new RuntimeException("Cannot instantiate " + rawStoreClassName, e); + } + } + + @Override + public void run() { + if (!shouldRunPrewarm) { + // TODO: prewarm and update can probably be merged. + update(); + } else { + try { + prewarm(rawStore); + } catch (Exception e) { + LOG.error("Prewarm failure", e); + return; + } + } + } + + void update() { + Deadline.registerIfNot(1000000); + LOG.debug("CachedStore: updating cached objects"); + try { + for (String catName : catalogsToCache(rawStore)) { + List<String> dbNames = rawStore.getAllDatabases(catName); + // Update the database in cache + updateDatabases(rawStore, catName, dbNames); + for (String dbName : dbNames) { + // Update the tables in cache + updateTables(rawStore, catName, dbName); + List<String> tblNames; + try { + tblNames = rawStore.getAllTables(catName, dbName); + } catch (MetaException e) { + // Continue with next database + continue; + } + for (String tblName : tblNames) { + if (!shouldCacheTable(catName, dbName, tblName)) { + continue; + } + // Update the table column stats for a table in cache + updateTableColStats(rawStore, catName, dbName, tblName); + // Update the partitions for a table in cache + updateTablePartitions(rawStore, catName, dbName, tblName); + // Update the partition col stats for a table in cache + updateTablePartitionColStats(rawStore, catName, dbName, tblName); + // Update aggregate partition column stats for a table in cache + updateTableAggregatePartitionColStats(rawStore, catName, dbName, tblName); + } + } + } + sharedCache.incrementUpdateCount(); + } catch (MetaException e) { + LOG.error("Updating CachedStore: error happen when refresh; skipping this iteration", e); + } + } + + + private void updateDatabases(RawStore rawStore, String catName, List<String> dbNames) { + // Prepare the list of databases + List<Database> databases = new ArrayList<>(); + for (String dbName : dbNames) { + Database db; + try { + db = rawStore.getDatabase(catName, dbName); + databases.add(db); + } catch (NoSuchObjectException e) { + LOG.info("Updating CachedStore: database - " + catName + "." + dbName + + " does not exist.", e); + } + } + sharedCache.refreshDatabasesInCache(databases); + } + + private void updateTables(RawStore rawStore, String catName, String dbName) { + List<Table> tables = new ArrayList<>(); + try { + List<String> tblNames = rawStore.getAllTables(catName, dbName); + for (String tblName : tblNames) { + if (!shouldCacheTable(catName, dbName, tblName)) { + continue; + } + Table table = rawStore.getTable(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName)); + tables.add(table); + } + sharedCache.refreshTablesInCache(catName, dbName, tables); + } catch (MetaException e) { + LOG.debug("Unable to refresh cached tables for database: " + dbName, e); + } + } + + + private void updateTableColStats(RawStore rawStore, String catName, String dbName, String tblName) { + try { + Table table = rawStore.getTable(catName, dbName, tblName); + if (!table.isSetPartitionKeys()) { + List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); + Deadline.startTimer("getTableColumnStatistics"); ++ // TODO## should this take write ID into account? or at least cache write ID to verify? + ColumnStatistics tableColStats = + rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); + Deadline.stopTimer(); + if (tableColStats != null) { ++ // TODO## should this take write ID into account? or at least cache write ID to verify? + sharedCache.refreshTableColStatsInCache(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); + } + } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Unable to refresh table column stats for table: " + tblName, e); + } + } + + private void updateTablePartitions(RawStore rawStore, String catName, String dbName, String tblName) { + try { + Deadline.startTimer("getPartitions"); + List<Partition> partitions = rawStore.getPartitions(catName, dbName, tblName, Integer.MAX_VALUE); + Deadline.stopTimer(); + sharedCache.refreshPartitionsInCache(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), partitions); + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); + } + } + + private void updateTablePartitionColStats(RawStore rawStore, String catName, String dbName, String tblName) { + try { + Table table = rawStore.getTable(catName, dbName, tblName); + List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); + List<String> partNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1); + // Get partition column stats for this table + Deadline.startTimer("getPartitionColumnStatistics"); ++ // TODO## should this take write ID into account? or at least cache write ID to verify? + List<ColumnStatistics> partitionColStats = + rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + sharedCache.refreshPartitionColStatsInCache(catName, dbName, tblName, partitionColStats); + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); + } + } + + // Update cached aggregate stats for all partitions of a table and for all + // but default partition + private void updateTableAggregatePartitionColStats(RawStore rawStore, String catName, String dbName, + String tblName) { + try { + Table table = rawStore.getTable(catName, dbName, tblName); + List<String> partNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1); + List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); + if ((partNames != null) && (partNames.size() > 0)) { + Deadline.startTimer("getAggregareStatsForAllPartitions"); + AggrStats aggrStatsAllPartitions = + rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + // Remove default partition from partition names and get aggregate stats again + List<FieldSchema> partKeys = table.getPartitionKeys(); + String defaultPartitionValue = + MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); + List<String> partCols = new ArrayList<String>(); + List<String> partVals = new ArrayList<String>(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault"); + AggrStats aggrStatsAllButDefaultPartition = + rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions, + aggrStatsAllButDefaultPartition); + } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, + e); + } + } + } + + @Override + public Configuration getConf() { + return rawStore.getConf(); + } + + @Override + public void shutdown() { + rawStore.shutdown(); + } + + @Override + public boolean openTransaction() { + return rawStore.openTransaction(); + } + + @Override + public boolean commitTransaction() { + return rawStore.commitTransaction(); + } + + @Override + public boolean isActiveTransaction() { + return rawStore.isActiveTransaction(); + } + + @Override + public void rollbackTransaction() { + rawStore.rollbackTransaction(); + } + + @Override + public void createCatalog(Catalog cat) throws MetaException { + rawStore.createCatalog(cat); + sharedCache.addCatalogToCache(cat); + } + + @Override + public void alterCatalog(String catName, Catalog cat) throws MetaException, + InvalidOperationException { + rawStore.alterCatalog(catName, cat); + sharedCache.alterCatalogInCache(StringUtils.normalizeIdentifier(catName), cat); + } + + @Override + public Catalog getCatalog(String catalogName) throws NoSuchObjectException, MetaException { + if (!sharedCache.isCatalogCachePrewarmed()) { + return rawStore.getCatalog(catalogName); + } + Catalog cat = sharedCache.getCatalogFromCache(normalizeIdentifier(catalogName)); + if (cat == null) { + throw new NoSuchObjectException(); + } + return cat; + } + + @Override + public List<String> getCatalogs() throws MetaException { + if (!sharedCache.isCatalogCachePrewarmed()) { + return rawStore.getCatalogs(); + } + return sharedCache.listCachedCatalogs(); + } + + @Override + public void dropCatalog(String catalogName) throws NoSuchObjectException, MetaException { + rawStore.dropCatalog(catalogName); + catalogName = catalogName.toLowerCase(); + sharedCache.removeCatalogFromCache(catalogName); + } + + @Override + public void createDatabase(Database db) throws InvalidObjectException, MetaException { + rawStore.createDatabase(db); + sharedCache.addDatabaseToCache(db); + } + + @Override + public Database getDatabase(String catName, String dbName) throws NoSuchObjectException { + if (!sharedCache.isDatabaseCachePrewarmed()) { + return rawStore.getDatabase(catName, dbName); + } + dbName = dbName.toLowerCase(); + Database db = sharedCache.getDatabaseFromCache(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName)); + if (db == null) { + throw new NoSuchObjectException(); + } + return db; + } + + @Override + public boolean dropDatabase(String catName, String dbName) throws NoSuchObjectException, MetaException { + boolean succ = rawStore.dropDatabase(catName, dbName); + if (succ) { + sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName)); + } + return succ; + } + + @Override + public boolean alterDatabase(String catName, String dbName, Database db) + throws NoSuchObjectException, MetaException { + boolean succ = rawStore.alterDatabase(catName, dbName, db); + if (succ) { + sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), db); + } + return succ; + } + + @Override + public List<String> getDatabases(String catName, String pattern) throws MetaException { + if (!sharedCache.isDatabaseCachePrewarmed()) { + return rawStore.getDatabases(catName, pattern); + } + return sharedCache.listCachedDatabases(catName, pattern); + } + + @Override + public List<String> getAllDatabases(String catName) throws MetaException { + if (!sharedCache.isDatabaseCachePrewarmed()) { + return rawStore.getAllDatabases(catName); + } + return sharedCache.listCachedDatabases(catName); + } + + @Override + public boolean createType(Type type) { + return rawStore.createType(type); + } + + @Override + public Type getType(String typeName) { + return rawStore.getType(typeName); + } + + @Override + public boolean dropType(String typeName) { + return rawStore.dropType(typeName); + } + + private void validateTableType(Table tbl) { + // If the table has property EXTERNAL set, update table type + // accordingly + String tableType = tbl.getTableType(); + boolean isExternal = Boolean.parseBoolean(tbl.getParameters().get("EXTERNAL")); + if (TableType.MANAGED_TABLE.toString().equals(tableType)) { + if (isExternal) { + tableType = TableType.EXTERNAL_TABLE.toString(); + } + } + if (TableType.EXTERNAL_TABLE.toString().equals(tableType)) { + if (!isExternal) { + tableType = TableType.MANAGED_TABLE.toString(); + } + } + tbl.setTableType(tableType); + } + + @Override + public void createTable(Table tbl) throws InvalidObjectException, MetaException { + rawStore.createTable(tbl); + String catName = normalizeIdentifier(tbl.getCatName()); + String dbName = normalizeIdentifier(tbl.getDbName()); + String tblName = normalizeIdentifier(tbl.getTableName()); + if (!shouldCacheTable(catName, dbName, tblName)) { + return; + } + validateTableType(tbl); + sharedCache.addTableToCache(catName, dbName, tblName, tbl); + } + + @Override + public boolean dropTable(String catName, String dbName, String tblName) + throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { + boolean succ = rawStore.dropTable(catName, dbName, tblName); + if (succ) { + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return succ; + } + sharedCache.removeTableFromCache(catName, dbName, tblName); + } + return succ; + } + + @Override + public Table getTable(String catName, String dbName, String tblName) throws MetaException { ++ return getTable(catName, dbName, tblName, -1, null); ++ } ++ ++ // TODO: if writeIdList is not null, check isolation level compliance for SVS, ++ // possibly with getTableFromCache() with table snapshot in cache. ++ @Override ++ public Table getTable(String catName, String dbName, String tblName, ++ long txnId, String writeIdList) ++ throws MetaException { + catName = normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { - return rawStore.getTable(catName, dbName, tblName); ++ return rawStore.getTable(catName, dbName, tblName, txnId,writeIdList); + } + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); - if (tbl == null) { ++ if (tbl == null || writeIdList != null) { + // This table is not yet loaded in cache + // If the prewarm thread is working on this table's database, + // let's move this table to the top of tblNamesBeingPrewarmed stack, + // so that it gets loaded to the cache faster and is available for subsequent requests + tblsPendingPrewarm.prioritizeTableForPrewarm(tblName); - return rawStore.getTable(catName, dbName, tblName); ++ return rawStore.getTable(catName, dbName, tblName, txnId, writeIdList); + } + if (tbl != null) { + tbl.unsetPrivileges(); + tbl.setRewriteEnabled(tbl.isRewriteEnabled()); + } + return tbl; + } + + @Override + public boolean addPartition(Partition part) throws InvalidObjectException, MetaException { + boolean succ = rawStore.addPartition(part); + if (succ) { + String dbName = normalizeIdentifier(part.getDbName()); + String tblName = normalizeIdentifier(part.getTableName()); + String catName = part.isSetCatName() ? normalizeIdentifier(part.getCatName()) : DEFAULT_CATALOG_NAME; + if (!shouldCacheTable(catName, dbName, tblName)) { + return succ; + } + sharedCache.addPartitionToCache(catName, dbName, tblName, part); + } + return succ; + } + + @Override + public boolean addPartitions(String catName, String dbName, String tblName, List<Partition> parts) + throws InvalidObjectException, MetaException { + boolean succ = rawStore.addPartitions(catName, dbName, tblName, parts); + if (succ) { + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return succ; + } + sharedCache.addPartitionsToCache(catName, dbName, tblName, parts); + } + return succ; + } + + @Override + public boolean addPartitions(String catName, String dbName, String tblName, PartitionSpecProxy partitionSpec, + boolean ifNotExists) throws InvalidObjectException, MetaException { + boolean succ = rawStore.addPartitions(catName, dbName, tblName, partitionSpec, ifNotExists); + if (succ) { + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return succ; + } + PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); + while (iterator.hasNext()) { + Partition part = iterator.next(); + sharedCache.addPartitionToCache(catName, dbName, tblName, part); + } + } + return succ; + } + + @Override + public Partition getPartition(String catName, String dbName, String tblName, List<String> part_vals) + throws MetaException, NoSuchObjectException { ++ return getPartition(catName, dbName, tblName, part_vals, -1, null); ++ } ++ ++ // TODO: the same as getTable() ++ @Override ++ public Partition getPartition(String catName, String dbName, String tblName, ++ List<String> part_vals, long txnId, String writeIdList) ++ throws MetaException, NoSuchObjectException { + catName = normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { - return rawStore.getPartition(catName, dbName, tblName, part_vals); ++ return rawStore.getPartition( ++ catName, dbName, tblName, part_vals, txnId, writeIdList); + } + Partition part = sharedCache.getPartitionFromCache(catName, dbName, tblName, part_vals); - if (part == null) { ++ if (part == null || writeIdList != null) { + // The table containing the partition is not yet loaded in cache - return rawStore.getPartition(catName, dbName, tblName, part_vals); ++ return rawStore.getPartition( ++ catName, dbName, tblName, part_vals, txnId, writeIdList); + } + return part; + } + + @Override + public boolean doesPartitionExist(String catName, String dbName, String tblName, + List<FieldSchema> partKeys, List<String> part_vals) + throws MetaException, NoSuchObjectException { + catName = normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, part_vals); + } + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl == null) { + // The table containing the partition is not yet loaded in cache + return rawStore.doesPartitionExist(catName, dbName, tblName, partKeys, part_vals); + } + return sharedCache.existPartitionFromCache(catName, dbName, tblName, part_vals); + } + + @Override + public boolean dropPartition(String catName, String dbName, String tblName, List<String> part_vals) + throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { + boolean succ = rawStore.dropPartition(catName, dbName, tblName, part_vals); + if (succ) { + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return succ; + } + sharedCache.removePartitionFromCache(catName, dbName, tblName, part_vals); + } + return succ; + } + + @Override + public void dropPartitions(String catName, String dbName, String tblName, List<String> partNames) + throws MetaException, NoSuchObjectException { + rawStore.dropPartitions(catName, dbName, tblName, partNames); + catName = normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return; + } + List<List<String>> partVals = new ArrayList<>(); + for (String partName : partNames) { + partVals.add(partNameToVals(partName)); + } + sharedCache.removePartitionsFromCache(catName, dbName, tblName, partVals); + } + + @Override + public List<Partition> getPartitions(String catName, String dbName, String tblName, int max) + throws MetaException, NoSuchObjectException { + catName = normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return rawStore.getPartitions(catName, dbName, tblName, max); + } + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl == null) { + // The table containing the partitions is not yet loaded in cache + return rawStore.getPartitions(catName, dbName, tblName, max); + } + List<Partition> parts = sharedCache.listCachedPartitions(catName, dbName, tblName, max); + return parts; + } + + @Override + public Map<String, String> getPartitionLocations(String catName, String dbName, String tblName, + String baseLocationToNotShow, int max) { + return rawStore.getPartitionLocations(catName, dbName, tblName, baseLocationToNotShow, max); + } + + @Override - public void alterTable(String catName, String dbName, String tblName, Table newTable) - throws InvalidObjectException, MetaException { - rawStore.alterTable(catName, dbName, tblName, newTable); ++ public void alterTable(String catName, String dbName, String tblName, Table newTable, ++ long txnId, String validWriteIds) throws InvalidObjectException, MetaException { ++ rawStore.alterTable(catName, dbName, tblName, newTable, txnId, validWriteIds); + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + String newTblName = normalizeIdentifier(newTable.getTableName()); + if (!shouldCacheTable(catName, dbName, tblName) && + !shouldCacheTable(catName, dbName, newTblName)) { + return; + } + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl == null) { + // The table is not yet loaded in cache + return; + } + if (shouldCacheTable(catName, dbName, tblName) && shouldCacheTable(catName, dbName, newTblName)) { + // If old table is in the cache and the new table can also be cached + sharedCache.alterTableInCache(catName, dbName, tblName, newTable); + } else if (!shouldCacheTable(catName, dbName, tblName) && shouldCacheTable(catName, dbName, newTblName)) { + // If old table is *not* in the cache but the new table can be cached + sharedCache.addTableToCache(catName, dbName, newTblName, newTable); + } else if (shouldCacheTable(catName, dbName, tblName) && !shouldCacheTable(catName, dbName, newTblName)) { + // If old table is in the cache but the new table *cannot* be cached + sharedCache.removeTableFromCache(catName, dbName, tblName); + } + } + + @Override + public void updateCreationMetadata(String catName, String dbname, String tablename, CreationMetadata cm) + throws MetaException { + rawStore.updateCreationMetadata(catName, dbname, tablename, cm); + } + + @Override + public List<String> getTables(String catName, String dbName, String pattern) throws MetaException { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { + return rawStore.getTables(catName, dbName, pattern); + } + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), pattern, (short) -1); + } + + @Override + public List<String> getTables(String catName, String dbName, String pattern, TableType tableType) + throws MetaException { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { + return rawStore.getTables(catName, dbName, pattern, tableType); + } + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), pattern, tableType); + } + + @Override + public List<String> getMaterializedViewsForRewriting(String catName, String dbName) + throws MetaException, NoSuchObjectException { + return rawStore.getMaterializedViewsForRewriting(catName, dbName); + } + + @Override + public List<TableMeta> getTableMeta(String catName, String dbNames, String tableNames, + List<String> tableTypes) throws MetaException { + // TODO Check if all required tables are allowed, if so, get it from cache + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { + return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes); + } + return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbNames), + StringUtils.normalizeIdentifier(tableNames), tableTypes); + } + + @Override + public List<Table> getTableObjectsByName(String catName, String dbName, List<String> tblNames) + throws MetaException, UnknownDBException { + dbName = normalizeIdentifier(dbName); + catName = normalizeIdentifier(catName); + boolean missSomeInCache = false; + for (String tblName : tblNames) { + tblName = normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + missSomeInCache = true; + break; + } + } + if (!isCachePrewarmed.get() || missSomeInCache) { + return rawStore.getTableObjectsByName(catName, dbName, tblNames); + } + List<Table> tables = new ArrayList<>(); + for (String tblName : tblNames) { + tblName = normalizeIdentifier(tblName); + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl == null) { + tbl = rawStore.getTable(catName, dbName, tblName); + } + tables.add(tbl); + } + return tables; + } + + @Override + public List<String> getAllTables(String catName, String dbName) throws MetaException { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { + return rawStore.getAllTables(catName, dbName); + } + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName)); + } + + @Override + public List<String> listTableNamesByFilter(String catName, String dbName, String filter, + short max_tables) + throws MetaException, UnknownDBException { + if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()) { + return rawStore.listTableNamesByFilter(catName, dbName, filter, max_tables); + } + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), + StringUtils.normalizeIdentifier(dbName), filter, max_tables); + } + + @Override + public List<String> listPartitionNames(String catName, String dbName, String tblName, + short max_parts) throws MetaException { + catName = StringUtils.normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return rawStore.listPartitionNames(catName, dbName, tblName, max_parts); + } + Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); + if (tbl == null) { + // The table is not yet loaded in cache + return rawStore.listPartitionNames(catName, dbName, tblName, max_parts); + } + List<String> partitionNames = new ArrayList<>(); + int count = 0; + for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, max_parts)) { + if (max_parts == -1 || count < max_parts) { + partitionNames.add(Warehouse.makePartName(tbl.getPartitionKeys(), part.getValues())); + } + } + return partitionNames; + } + + @Override + public PartitionValuesResponse listPartitionValues(String catName, String db_name, String tbl_name, + List<FieldSchema> cols, boolean applyDistinct, String filter, boolean ascending, + List<FieldSchema> order, long maxParts) throws MetaException { + throw new UnsupportedOperationException(); + } + + @Override + public void alterPartition(String catName, String dbName, String tblName, List<String> partVals, - Partition newPart) throws InvalidObjectException, MetaException { - rawStore.alterPartition(catName, dbName, tblName, partVals, newPart); ++ Partition newPart, long queryTxnId, String queryValidWriteIds) ++ throws InvalidObjectException, MetaException { ++ rawStore.alterPartition(catName, dbName, tblName, partVals, newPart, queryTxnId, queryValidWriteIds); + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return; + } + sharedCache.alterPartitionInCache(catName, dbName, tblName, partVals, newPart); + } + + @Override + public void alterPartitions(String catName, String dbName, String tblName, - List<List<String>> partValsList, List<Partition> newParts) ++ List<List<String>> partValsList, List<Partition> newParts, ++ long writeId, long txnId, String validWriteIds) + throws InvalidObjectException, MetaException { - rawStore.alterPartitions(catName, dbName, tblName, partValsList, newParts); ++ rawStore.alterPartitions( ++ catName, dbName, tblName, partValsList, newParts, writeId, txnId, validWriteIds); + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return; + } ++ // TODO: modify the following method for the case when writeIdList != null. + sharedCache.alterPartitionsInCache(catName, dbName, tblName, partValsList, newParts); + } + + private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr, + String defaultPartName, short maxParts, List<String> result, SharedCache sharedCache) + throws MetaException, NoSuchObjectException { + List<Partition> parts = + sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(table.getCatName()), + StringUtils.normalizeIdentifier(table.getDbName()), + StringUtils.normalizeIdentifier(table.getTableName()), maxParts); + for (Partition part : parts) { + result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues())); + } + if (defaultPartName == null || defaultPartName.isEmpty()) { + defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); + } + return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName, + result); + } + + @Override + public List<Partition> getPartitionsByFilter(String catName, String dbName, String tblName, + String filter, short maxParts) + throws MetaException, NoSuchObjectException { + return rawStore.getPartitionsByFilter(catName, dbName, tblName, filter, maxParts); + } + + @Override + public boolean getPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr, + String defaultPartitionName, short maxParts, List<Partition> result) throws TException { + catName = StringUtils.normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return rawStore.getPartitionsByExpr(catName, dbName, tblName, expr, defaultPartitionName, maxParts, + result); + } + List<String> partNames = new LinkedList<>(); + Table table = sharedCache.getTableFromCache(catName, dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.getPartitionsByExpr(catName, dbName, tblName, expr, defaultPartitionName, maxParts, + result); + } + boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn(table, expr, + defaultPartitionName, maxParts, partNames, sharedCache); + return hasUnknownPartitions; + } + + @Override + public int getNumPartitionsByFilter(String catName, String dbName, String tblName, String filter) + throws MetaException, NoSuchObjectException { + return rawStore.getNumPartitionsByFilter(catName, dbName, tblName, filter); + } + + @Override + public int getNumPartitionsByExpr(String catName, String dbName, String tblName, byte[] expr) + throws MetaException, NoSuchObjectException { + catName = normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return rawStore.getNumPartitionsByExpr(catName, dbName, tblName, expr); + } + String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); + List<String> partNames = new LinkedList<>(); + Table table = sharedCache.getTableFromCache(catName, dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.getNumPartitionsByExpr(catName, dbName, tblName, expr); + } + getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames, + sharedCache); + return partNames.size(); + } + + private static List<String> partNameToVals(String name) { + if (name == null) { + return null; + } + List<String> vals = new ArrayList<>(); + String[] kvp = name.split("/"); + for (String kv : kvp) { + vals.add(FileUtils.unescapePathName(kv.substring(kv.indexOf('=') + 1))); + } + return vals; + } + + @Override + public List<Partition> getPartitionsByNames(String catName, String dbName, String tblName, + List<String> partNames) throws MetaException, NoSuchObjectException { + catName = StringUtils.normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return rawStore.getPartitionsByNames(catName, dbName, tblName, partNames); + } + Table table = sharedCache.getTableFromCache(catName, dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.getPartitionsByNames(catName, dbName, tblName, partNames); + } + List<Partition> partitions = new ArrayList<>(); + for (String partName : partNames) { + Partition part = sharedCache.getPartitionFromCache(catName, dbName, tblName, partNameToVals(partName)); + if (part!=null) { + partitions.add(part); + } + } + return partitions; + } + + @Override + public Table markPartitionForEvent(String catName, String dbName, String tblName, + Map<String, String> partVals, PartitionEventType evtType) + throws MetaException, UnknownTableException, InvalidPartitionException, + UnknownPartitionException { + return rawStore.markPartitionForEvent(catName, dbName, tblName, partVals, evtType); + } + + @Override + public boolean isPartitionMarkedForEvent(String catName, String dbName, String tblName, + Map<String, String> partName, PartitionEventType evtType) + throws MetaException, UnknownTableException, InvalidPartitionException, + UnknownPartitionException { + return rawStore.isPartitionMarkedForEvent(catName, dbName, tblName, partName, evtType); + } + + @Override + public boolean addRole(String rowName, String ownerName) + throws InvalidObjectException, MetaException, NoSuchObjectException { + return rawStore.addRole(rowName, ownerName); + } + + @Override + public boolean removeRole(String roleName) + throws MetaException, NoSuchObjectException { + return rawStore.removeRole(roleName); + } + + @Override + public boolean grantRole(Role role, String userName, + PrincipalType principalType, String grantor, PrincipalType grantorType, + boolean grantOption) + throws MetaException, NoSuchObjectException, InvalidObjectException { + return rawStore.grantRole(role, userName, principalType, grantor, grantorType, grantOption); + } + + @Override + public boolean revokeRole(Role role, String userName, + PrincipalType principalType, boolean grantOption) + throws MetaException, NoSuchObjectException { + return rawStore.revokeRole(role, userName, principalType, grantOption); + } + + @Override + public PrincipalPrivilegeSet getUserPrivilegeSet(String userName, + List<String> groupNames) throws InvalidObjectException, MetaException { + return rawStore.getUserPrivilegeSet(userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getDBPrivilegeSet(String catName, String dbName, String userName, + List<String> groupNames) throws InvalidObjectException, MetaException { + return rawStore.getDBPrivilegeSet(catName, dbName, userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getTablePrivilegeSet(String catName, String dbName, + String tableName, String userName, List<String> groupNames) + throws InvalidObjectException, MetaException { + return rawStore.getTablePrivilegeSet(catName, dbName, tableName, userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getPartitionPrivilegeSet(String catName, String dbName, + String tableName, String partition, String userName, + List<String> groupNames) throws InvalidObjectException, MetaException { + return rawStore.getPartitionPrivilegeSet(catName, dbName, tableName, partition, userName, groupNames); + } + + @Override + public PrincipalPrivilegeSet getColumnPrivilegeSet(String catName, String dbName, + String tableName, String partitionName, String columnName, + String userName, List<String> groupNames) + throws InvalidObjectException, MetaException { + return rawStore.getColumnPrivilegeSet(catName, dbName, tableName, partitionName, columnName, userName, groupNames); + } + + @Override + public List<HiveObjectPrivilege> listPrincipalGlobalGrants( + String principalName, PrincipalType principalType) { + return rawStore.listPrincipalGlobalGrants(principalName, principalType); + } + + @Override + public List<HiveObjectPrivilege> listPrincipalDBGrants(String principalName, + PrincipalType principalType, String catName, String dbName) { + return rawStore.listPrincipalDBGrants(principalName, principalType, catName, dbName); + } + + @Override + public List<HiveObjectPrivilege> listAllTableGrants(String principalName, + PrincipalType principalType, String catName, String dbName, String tableName) { + return rawStore.listAllTableGrants(principalName, principalType, catName, dbName, tableName); + } + + @Override + public List<HiveObjectPrivilege> listPrincipalPartitionGrants( + String principalName, PrincipalType principalType, String catName, String dbName, + String tableName, List<String> partValues, String partName) { + return rawStore.listPrincipalPartitionGrants(principalName, principalType, catName, dbName, tableName, partValues, partName); + } + + @Override + public List<HiveObjectPrivilege> listPrincipalTableColumnGrants( + String principalName, PrincipalType principalType, String catName, String dbName, + String tableName, String columnName) { + return rawStore.listPrincipalTableColumnGrants(principalName, principalType, catName, dbName, tableName, columnName); + } + + @Override + public List<HiveObjectPrivilege> listPrincipalPartitionColumnGrants( + String principalName, PrincipalType principalType, String catName, String dbName, + String tableName, List<String> partValues, String partName, + String columnName) { + return rawStore.listPrincipalPartitionColumnGrants(principalName, principalType, catName, dbName, tableName, partValues, partName, columnName); + } + + @Override + public boolean grantPrivileges(PrivilegeBag privileges) + throws InvalidObjectException, MetaException, NoSuchObjectException { + return rawStore.grantPrivileges(privileges); + } + + @Override + public boolean revokePrivileges(PrivilegeBag privileges, boolean grantOption) + throws InvalidObjectException, MetaException, NoSuchObjectException { + return rawStore.revokePrivileges(privileges, grantOption); + } + + @Override + public boolean refreshPrivileges(HiveObjectRef objToRefresh, String authorizer, PrivilegeBag grantPrivileges) + throws InvalidObjectException, MetaException, NoSuchObjectException { + return rawStore.refreshPrivileges(objToRefresh, authorizer, grantPrivileges); + } + + @Override + public Role getRole(String roleName) throws NoSuchObjectException { + return rawStore.getRole(roleName); + } + + @Override + public List<String> listRoleNames() { + return rawStore.listRoleNames(); + } + + @Override + public List<Role> listRoles(String principalName, + PrincipalType principalType) { + return rawStore.listRoles(principalName, principalType); + } + + @Override + public List<RolePrincipalGrant> listRolesWithGrants(String principalName, + PrincipalType principalType) { + return rawStore.listRolesWithGrants(principalName, principalType); + } + + @Override + public List<RolePrincipalGrant> listRoleMembers(String roleName) { + return rawStore.listRoleMembers(roleName); + } + + @Override + public Partition getPartitionWithAuth(String catName, String dbName, String tblName, + List<String> partVals, String userName, List<String> groupNames) + throws MetaException, NoSuchObjectException, InvalidObjectException { + catName = StringUtils.normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return rawStore.getPartitionWithAuth(catName, dbName, tblName, partVals, userName, groupNames); + } + Table table = sharedCache.getTableFromCache(catName, dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.getPartitionWithAuth(catName, dbName, tblName, partVals, userName, groupNames); + } + Partition p = sharedCache.getPartitionFromCache(catName, dbName, tblName, partVals); + if (p != null) { + String partName = Warehouse.makePartName(table.getPartitionKeys(), partVals); + PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(catName, dbName, tblName, partName, + userName, groupNames); + p.setPrivileges(privs); + } + return p; + } + + @Override + public List<Partition> getPartitionsWithAuth(String catName, String dbName, String tblName, + short maxParts, String userName, List<String> groupNames) + throws MetaException, NoSuchObjectException, InvalidObjectException { + catName = StringUtils.normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return rawStore.getPartitionsWithAuth(catName, dbName, tblName, maxParts, userName, groupNames); + } + Table table = sharedCache.getTableFromCache(catName, dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.getPartitionsWithAuth(catName, dbName, tblName, maxParts, userName, groupNames); + } + List<Partition> partitions = new ArrayList<>(); + int count = 0; + for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, maxParts)) { + if (maxParts == -1 || count < maxParts) { + String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues()); + PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(catName, dbName, tblName, partName, + userName, groupNames); + part.setPrivileges(privs); + partitions.add(part); + count++; + } + } + return partitions; + } + + @Override + public List<String> listPartitionNamesPs(String catName, String dbName, String tblName, + List<String> partVals, short maxParts) + throws MetaException, NoSuchObjectException { + catName = StringUtils.normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return rawStore.listPartitionNamesPs(catName, dbName, tblName, partVals, maxParts); + } + Table table = sharedCache.getTableFromCache(catName, dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.listPartitionNamesPs(catName, dbName, tblName, partVals, maxParts); + } + List<String> partNames = new ArrayList<>(); + int count = 0; + for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, maxParts)) { + boolean psMatch = true; + for (int i=0;i<partVals.size();i++) { + String psVal = partVals.get(i); + String partVal = part.getValues().get(i); + if (psVal!=null && !psVal.isEmpty() && !psVal.equals(partVal)) { + psMatch = false; + break; + } + } + if (!psMatch) { + continue; + } + if (maxParts == -1 || count < maxParts) { + partNames.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues())); + count++; + } + } + return partNames; + } + + @Override + public List<Partition> listPartitionsPsWithAuth(String catName, String dbName, String tblName, + List<String> partVals, short maxParts, String userName, List<String> groupNames) + throws MetaException, InvalidObjectException, NoSuchObjectException { + catName = StringUtils.normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return rawStore.listPartitionsPsWithAuth(catName, dbName, tblName, partVals, maxParts, userName, + groupNames); + } + Table table = sharedCache.getTableFromCache(catName, dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.listPartitionsPsWithAuth(catName, dbName, tblName, partVals, maxParts, userName, + groupNames); + } + List<Partition> partitions = new ArrayList<>(); + int count = 0; + for (Partition part : sharedCache.listCachedPartitions(catName, dbName, tblName, maxParts)) { + boolean psMatch = true; + for (int i = 0; i < partVals.size(); i++) { + String psVal = partVals.get(i); + String partVal = part.getValues().get(i); + if (psVal != null && !psVal.isEmpty() && !psVal.equals(partVal)) { + psMatch = false; + break; + } + } + if (!psMatch) { + continue; + } + if (maxParts == -1 || count < maxParts) { + String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues()); + PrincipalPrivilegeSet privs = + getPartitionPrivilegeSet(catName, dbName, tblName, partName, userName, groupNames); + part.setPrivileges(privs); + partitions.add(part); + } + } + return partitions; + } + + @Override - public boolean updateTableColumnStatistics(ColumnStatistics colStats) ++ public boolean updateTableColumnStatistics(ColumnStatistics colStats, long txnId, String validWriteIds, long writeId) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.updateTableColumnStatistics(colStats); ++ boolean succ = rawStore.updateTableColumnStatistics(colStats, txnId, validWriteIds, writeId); + if (succ) { + String catName = colStats.getStatsDesc().isSetCatName() ? + normalizeIdentifier(colStats.getStatsDesc().getCatName()) : + getDefaultCatalog(conf); + String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName()); + String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName()); + if (!shouldCacheTable(catName, dbName, tblName)) { + return succ; + } + Table table = sharedCache.getTableFromCache(catName, dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return succ; + } + List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj(); + List<String> colNames = new ArrayList<>(); + for (ColumnStatisticsObj statsObj : statsObjs) { + colNames.add(statsObj.getColName()); + } + StatsSetupConst.setColumnStatsState(table.getParameters(), colNames); + sharedCache.alterTableInCache(catName, dbName, tblName, table); + sharedCache.updateTableColStatsInCache(catName, dbName, tblName, statsObjs); + } + return succ; + } + + @Override + public ColumnStatistics getTableColumnStatistics(String catName, String dbName, String tblName, + List<String> colNames) throws MetaException, NoSuchObjectException { ++ return getTableColumnStatistics(catName, dbName, tblName, colNames, -1, null); ++ } ++ ++ // TODO: the same as getTable() ++ @Override ++ public ColumnStatistics getTableColumnStatistics( ++ String catName, String dbName, String tblName, List<String> colNames, ++ long txnId, String writeIdList) ++ throws MetaException, NoSuchObjectException { + catName = StringUtils.normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { - return rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); ++ return rawStore.getTableColumnStatistics( ++ catName, dbName, tblName, colNames, txnId, writeIdList); + } + Table table = sharedCache.getTableFromCache(catName, dbName, tblName); - if (table == null) { ++ if (table == null || writeIdList != null) { + // The table is not yet loaded in cache - return rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); ++ return rawStore.getTableColumnStatistics( ++ catName, dbName, tblName, colNames, txnId, writeIdList); + } + ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName); + List<ColumnStatisticsObj> colStatObjs = + sharedCache.getTableColStatsFromCache(catName, dbName, tblName, colNames); + return new ColumnStatistics(csd, colStatObjs); + } + + @Override + public boolean deleteTableColumnStatistics(String catName, String dbName, String tblName, + String colName) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + boolean succ = rawStore.deleteTableColumnStatistics(catName, dbName, tblName, colName); + if (succ) { + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return succ; + } + sharedCache.removeTableColStatsFromCache(catName, dbName, tblName, colName); + } + return succ; + } + + @Override - public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List<String> partVals) ++ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List<String> partVals, ++ long txnId, String validWriteIds, long writeId) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals); ++ boolean succ = rawStore.updatePartitionColumnStatistics(colStats, partVals, txnId, validWriteIds, writeId); + if (succ) { + String catName = colStats.getStatsDesc().isSetCatName() ? + normalizeIdentifier(colStats.getStatsDesc().getCatName()) : DEFAULT_CATALOG_NAME; + String dbName = normalizeIdentifier(colStats.getStatsDesc().getDbName()); + String tblName = normalizeIdentifier(colStats.getStatsDesc().getTableName()); + if (!shouldCacheTable(catName, dbName, tblName)) { + return succ; + } + List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj(); + Partition part = getPartition(catName, dbName, tblName, partVals); + List<String> colNames = new ArrayList<>(); + for (ColumnStatisticsObj statsObj : statsObjs) { + colNames.add(statsObj.getColName()); + } + StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); + sharedCache.alterPartitionInCache(catName, dbName, tblName, partVals, part); + sharedCache.updatePartitionColStatsInCache(catName, dbName, tblName, partVals, colStats.getStatsObj()); + } + return succ; + } + + @Override + // TODO: calculate from cached values. + public List<ColumnStatistics> getPartitionColumnStatistics(String catName, String dbName, String tblName, + List<String> partNames, List<String> colNames) throws MetaException, NoSuchObjectException { + return rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames); + } + + @Override ++ public List<ColumnStatistics> getPartitionColumnStatistics( ++ String catName, String dbName, String tblName, List<String> partNames, ++ List<String> colNames, long txnId, String writeIdList) ++ throws MetaException, NoSuchObjectException { ++ return rawStore.getPartitionColumnStatistics( ++ catName, dbName, tblName, partNames, colNames, txnId, writeIdList); ++ } ++ ++ @Override + public boolean deletePartitionColumnStatistics(String catName, String dbName, String tblName, String partName, + List<String> partVals, String colName) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { + boolean succ = + rawStore.deletePartitionColumnStatistics(catName, dbName, tblName, partName, partVals, colName); + if (succ) { + catName = normalizeIdentifier(catName); + dbName = normalizeIdentifier(dbName); + tblName = normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { + return succ; + } + sharedCache.removePartitionColStatsFromCache(catName, dbName, tblName, partVals, colName); + } + return succ; + } + + @Override + public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName, List<String> partNames, + List<String> colNames) throws MetaException, NoSuchObjectException { ++ return get_aggr_stats_for(catName, dbName, tblName, partNames, colNames, -1, null); ++ } ++ ++ @Override ++ // TODO: the same as getTable() for transactional stats. ++ public AggrStats get_aggr_stats_for(String catName, String dbName, String tblName, ++ List<String> partNames, List<String> colNames, ++ long txnId, String writeIdList) ++ throws MetaException, NoSuchObjectException { + List<ColumnStatisticsObj> colStats; + catName = normalizeIdentifier(catName); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(catName, dbName, tblName)) { - rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); ++ rawStore.get_aggr_stats_for( ++ catName, dbName, tblName, partNames, colNames, txnId, writeIdList); + } + Table table = sharedCache.getTableFromCache(catName, dbName, tblName); - if (table == null) { ++ if (table == null || writeIdList != null) { + // The table is not yet loaded in cache - return rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); ++ return rawStore.get_aggr_stats_for( ++ catName, dbName, tblName, partNames, colNames, txnId, writeIdList); + } + List<String> allPartNames = rawStore.listPartitionNames(catName, dbName, tblName, (short) -1); + if (partNames.size() == allPartNames.size()) { + colStats = sharedCache.getAggrStatsFromCache(catName, dbName, tblName, colNames, StatsType.ALL); + if (colStats != null) { + return new AggrStats(colStats, partNames.size()); + } + } else if (partNames.size() == (allPartNames.size() - 1)) { + String defaultPartitionName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); + if (!partNames.contains(defaultPartitionName)) { + colStats = + sharedCache.getAggrStatsFromCache(catName, dbName, tblName, colNames, StatsType.ALLBUTDEFAULT); + if (colStats != null) { + return new AggrStats(colStats, partNames.size()); + } + } + } + LOG.debug("Didn't find aggr stats in cache. Merging them. tblName= {}, parts= {}, cols= {}", + tblName, partNames, colNames); + MergedColumnStatsForPartitions mergedColStats = + mergeColStatsForPartitions(catName, dbName, tblName, partNames, colNames, sharedCache); + return new AggrStats(mergedColStats.getColStats(), mergedColStats.getPartsFound()); + } + + private MergedColumnStatsForPartitions mergeColStatsForPartitions( + String catName, String dbName, String tblName, List<String> partNames, List<String> colNames, + SharedCache sharedCache) throws MetaException { + final boolean useDensityFunctionForNDVEstimation = + MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_NDV_DENSITY_FUNCTION); + final double ndvTuner = MetastoreConf.getDoubleVar(getConf(), ConfVars.STATS_NDV_TUNER); + Map<ColumnStatsAggregator, List<ColStatsObjWithSourceInfo>> colStatsMap = new HashMap<>(); + boolean areAllPartsFound = true; + long partsFound = 0; + for (String colName : colNames) { + long partsFoundForColumn = 0; + ColumnStatsAggregator colStatsAggregator = null; + List<ColStatsObjWithSourceInfo> colStatsWithPartInfoList = new ArrayList<>(); + for (String partName : partNames) { + ColumnStatisticsObj colStatsForPart = + sharedCache.getPartitionColStatsFromCache(catName, dbName, tblName, partNameToVals(partName), colName); + if (colStatsForPart != null) { + ColStatsObjWithSourceInfo colStatsWithPartInfo = + new ColStatsObjWithSourceInfo(colStatsForPart, catName, dbName, tblName, partName); + colStatsWithPartInfoList.add(colStatsWithPartInfo); + if (colStatsAggregator == null) { + colStatsAggregator = ColumnStatsAggregatorFactory.getColumnStatsAggregator( + colStatsForPart.getStatsData().getSetField(), useDensityFunctionForNDVEstimation, + ndvTuner); + } + partsFoundForColumn++; + } else { + LOG.debug( + "Stats not found in CachedStore for: dbName={} tblName={} partName={} colName={}", + dbName, tblName, partName, colName); + } + } + if (colStatsWithPartInfoList.size() > 0) { + colStatsMap.put(colStatsAggregator, colStatsWithPartInfoList); + } + if (partsFoundForColumn == partNames.size()) { + partsFound++; + } + if (colStatsMap.size() < 1) { + LOG.debug("No stats data found for: dbName={} tblName= {} partNames= {} colNames= ", dbName, + tblName, partNames, colNames); + return new MergedColumnStatsForPartitions(new ArrayList<ColumnStatisticsObj>(), 0); + } + } + // Note that enableBitVector does not apply here because ColumnStatisticsObj + // itself will tell whether bitvector is null or not and aggr logic can automatically apply. + return new MergedColumnStatsForPartitions(MetaStoreUtils.aggrPartitionStats(colStatsMap, + partNames, areAllPartsFound, useDensityFunctionForNDVEstimation, ndvTuner), partsFound); + } + + class MergedColumnStatsForPartitions { + List<ColumnStatisticsObj> colStats = new ArrayList<ColumnStatisticsObj>(); + long partsFound; + + MergedColumnStatsForPartitions(List<ColumnStatisticsObj> colStats, long partsFound) { + this.colStats = colStats; + this.partsFound = partsFound; + } + + List<ColumnStatisticsObj> getColStats() { + return colStats; + } + + long getPartsFound() { + return partsFound; + } + } + + @Override + public long cleanupEvents() { + return rawStore.cleanupEvents(); + } + + @Override + public boolean addToken(String tokenIdentifier, String delegationToken) { + return rawStore.addToken(tokenIdentifier, delegationToken); + } + + @Override + public boolean removeToken(String tokenIdentifier) { + return rawStore.removeToken(tokenIdentifier); + } + + @Override + public String getToken(String tokenIdentifier) { + return rawStore.getToken(tokenIdentifier); + } + + @Override + public List<String> getAllTokenIdentifiers() { +
<TRUNCATED>
