Repository: hive Updated Branches: refs/heads/master 42187fdbc -> b3fe6522e
HIVE-18840: CachedStore: Prioritize loading of recently accessed tables during prewarm (Vaibhav Gumashta reviewed by Daniel Dai) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b3fe6522 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b3fe6522 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b3fe6522 Branch: refs/heads/master Commit: b3fe6522e651fa4f00f1a1a75e6f12c132eacf21 Parents: 42187fd Author: Vaibhav Gumashta <vgumas...@hortonworks.com> Authored: Wed Apr 11 15:39:30 2018 -0700 Committer: Vaibhav Gumashta <vgumas...@hortonworks.com> Committed: Wed Apr 11 15:39:30 2018 -0700 ---------------------------------------------------------------------- .../hive/metastore/cache/CachedStore.java | 198 +++++++++++-------- 1 file changed, 114 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b3fe6522/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index c47856d..1ce86bb 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -18,23 +18,21 @@ package org.apache.hadoop.hive.metastore.cache; -import java.io.Closeable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; +import java.util.EmptyStackException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Stack; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -100,7 +98,6 @@ import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; import org.apache.hadoop.hive.metastore.api.SchemaVersion; import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor; import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; import org.apache.hadoop.hive.metastore.api.Type; @@ -146,6 +143,7 @@ public class CachedStore implements RawStore, Configurable { // Time after which metastore cache is updated from metastore DB by the background update thread private static long cacheRefreshPeriodMS = DEFAULT_CACHE_REFRESH_PERIOD; private static AtomicBoolean isCachePrewarmed = new AtomicBoolean(false); + private static TablesPendingPrewarm tblsPendingPrewarm = new TablesPendingPrewarm(); private RawStore rawStore = null; private Configuration conf; private PartitionExpressionProxy expressionProxy = null; @@ -153,10 +151,6 @@ public class CachedStore implements RawStore, Configurable { static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName()); - public CachedStore() { - - } - @Override public void setConf(Configuration conf) { setConfInternal(conf); @@ -211,12 +205,13 @@ public class CachedStore implements RawStore, Configurable { Collection<String> catalogsToCache; try { catalogsToCache = catalogsToCache(rawStore); - LOG.info("Going to cache catalogs: " + - org.apache.commons.lang.StringUtils.join(catalogsToCache, ", ")); + LOG.info("Going to cache catalogs: " + + org.apache.commons.lang.StringUtils.join(catalogsToCache, ", ")); List<Catalog> catalogs = new ArrayList<>(catalogsToCache.size()); - for (String catName : catalogsToCache) catalogs.add(rawStore.getCatalog(catName)); + for (String catName : catalogsToCache) + catalogs.add(rawStore.getCatalog(catName)); sharedCache.populateCatalogsInCache(catalogs); - } catch (MetaException|NoSuchObjectException e) { + } catch (MetaException | NoSuchObjectException e) { LOG.warn("Failed to populate catalogs in cache, going to try again", e); // try again continue; @@ -232,8 +227,8 @@ public class CachedStore implements RawStore, Configurable { databases.add(rawStore.getDatabase(catName, dbName)); } catch (NoSuchObjectException e) { // Continue with next database - LOG.warn("Failed to cache database " + - Warehouse.getCatalogQualifiedDbName(catName, dbName) + ", moving on", e); + LOG.warn("Failed to cache database " + + Warehouse.getCatalogQualifiedDbName(catName, dbName) + ", moving on", e); } } } catch (MetaException e) { @@ -251,87 +246,92 @@ public class CachedStore implements RawStore, Configurable { try { tblNames = rawStore.getAllTables(catName, dbName); } catch (MetaException e) { - LOG.warn("Failed to cache tables for database " + - Warehouse.getCatalogQualifiedDbName(catName, dbName) + ", moving on"); + LOG.warn("Failed to cache tables for database " + + Warehouse.getCatalogQualifiedDbName(catName, dbName) + ", moving on"); // Continue with next database continue; } + tblsPendingPrewarm.addTableNamesForPrewarming(tblNames); + int totalTablesToCache = tblNames.size(); int numberOfTablesCachedSoFar = 0; - for (String tblName : tblNames) { - tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(catName, dbName, tblName)) { - continue; - - } - Table table; + while (tblsPendingPrewarm.hasMoreTablesToPrewarm()) { try { - table = rawStore.getTable(catName, dbName, tblName); - } catch (MetaException e) { - LOG.warn("Failed cache table " + - Warehouse.getCatalogQualifiedTableName(catName, dbName, tblName) + - ", moving on"); - // It is possible the table is deleted during fetching tables of the database, - // in that case, continue with the next table - continue; - } - List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); - try { - ColumnStatistics tableColStats = null; - List<Partition> partitions = null; - List<ColumnStatistics> partitionColStats = null; - AggrStats aggrStatsAllPartitions = null; - AggrStats aggrStatsAllButDefaultPartition = null; - if (table.isSetPartitionKeys()) { - Deadline.startTimer("getPartitions"); - partitions = rawStore.getPartitions(catName, dbName, tblName, Integer.MAX_VALUE); - Deadline.stopTimer(); - List<String> partNames = new ArrayList<>(partitions.size()); - for (Partition p : partitions) { - partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues())); - } - if (!partNames.isEmpty()) { - // Get partition column stats for this table - Deadline.startTimer("getPartitionColumnStatistics"); - partitionColStats = - rawStore.getPartitionColumnStatistics(catName, dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - // Get aggregate stats for all partitions of a table and for all but default - // partition - Deadline.startTimer("getAggrPartitionColumnStatistics"); - aggrStatsAllPartitions = - rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); + String tblName = + StringUtils.normalizeIdentifier(tblsPendingPrewarm.getNextTableNameToPrewarm()); + if (!shouldCacheTable(catName, dbName, tblName)) { + continue; + } + Table table; + try { + table = rawStore.getTable(catName, dbName, tblName); + } catch (MetaException e) { + // It is possible the table is deleted during fetching tables of the database, + // in that case, continue with the next table + continue; + } + List<String> colNames = MetaStoreUtils.getColumnNamesForTable(table); + try { + ColumnStatistics tableColStats = null; + List<Partition> partitions = null; + List<ColumnStatistics> partitionColStats = null; + AggrStats aggrStatsAllPartitions = null; + AggrStats aggrStatsAllButDefaultPartition = null; + if (table.isSetPartitionKeys()) { + Deadline.startTimer("getPartitions"); + partitions = rawStore.getPartitions(catName, dbName, tblName, Integer.MAX_VALUE); Deadline.stopTimer(); - // Remove default partition from partition names and get aggregate - // stats again - List<FieldSchema> partKeys = table.getPartitionKeys(); - String defaultPartitionValue = - MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); - List<String> partCols = new ArrayList<>(); - List<String> partVals = new ArrayList<>(); - for (FieldSchema fs : partKeys) { - partCols.add(fs.getName()); - partVals.add(defaultPartitionValue); + List<String> partNames = new ArrayList<>(partitions.size()); + for (Partition p : partitions) { + partNames.add(Warehouse.makePartName(table.getPartitionKeys(), p.getValues())); + } + if (!partNames.isEmpty()) { + // Get partition column stats for this table + Deadline.startTimer("getPartitionColumnStatistics"); + partitionColStats = rawStore.getPartitionColumnStatistics(catName, dbName, + tblName, partNames, colNames); + Deadline.stopTimer(); + // Get aggregate stats for all partitions of a table and for all but default + // partition + Deadline.startTimer("getAggrPartitionColumnStatistics"); + aggrStatsAllPartitions = + rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + // Remove default partition from partition names and get aggregate + // stats again + List<FieldSchema> partKeys = table.getPartitionKeys(); + String defaultPartitionValue = + MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); + List<String> partCols = new ArrayList<>(); + List<String> partVals = new ArrayList<>(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + Deadline.startTimer("getAggrPartitionColumnStatistics"); + aggrStatsAllButDefaultPartition = + rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); + Deadline.stopTimer(); } - String defaultPartitionName = FileUtils.makePartName(partCols, partVals); - partNames.remove(defaultPartitionName); - Deadline.startTimer("getAggrPartitionColumnStatistics"); - aggrStatsAllButDefaultPartition = - rawStore.get_aggr_stats_for(catName, dbName, tblName, partNames, colNames); + } else { + Deadline.startTimer("getTableColumnStatistics"); + tableColStats = + rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); Deadline.stopTimer(); } - } else { - Deadline.startTimer("getTableColumnStatistics"); - tableColStats = rawStore.getTableColumnStatistics(catName, dbName, tblName, colNames); - Deadline.stopTimer(); + sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats, + aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); + } catch (MetaException | NoSuchObjectException e) { + // Continue with next table + continue; } - sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats, - aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); - } catch (MetaException | NoSuchObjectException e) { - // Continue with next table + LOG.debug("Processed database: {}'s table: {}. Cached {} / {} tables so far.", dbName, + tblName, ++numberOfTablesCachedSoFar, totalTablesToCache); + } catch (EmptyStackException e) { + // We've prewarmed this database, continue with the next one continue; } - LOG.debug("Processed database: {}'s table: {}. Cached {} / {} tables so far.", dbName, - tblName, ++numberOfTablesCachedSoFar, tblNames.size()); } LOG.debug("Processed database: {}. Cached {} / {} databases so far.", dbName, ++numberOfDatabasesCachedSoFar, databases.size()); @@ -344,6 +344,32 @@ public class CachedStore implements RawStore, Configurable { sharedCache.completeTableCachePrewarm(); } + static class TablesPendingPrewarm { + private Stack<String> tableNames = new Stack<>(); + + private synchronized void addTableNamesForPrewarming(List<String> tblNames) { + tableNames.clear(); + if (tblNames != null) { + tableNames.addAll(tblNames); + } + } + + private synchronized boolean hasMoreTablesToPrewarm() { + return !tableNames.empty(); + } + + private synchronized String getNextTableNameToPrewarm() { + return tableNames.pop(); + } + + private synchronized void prioritizeTableForPrewarm(String tblName) { + // If the table is in the pending prewarm list, move it to the top + if (tableNames.remove(tblName)) { + tableNames.push(tblName); + } + } + } + @VisibleForTesting static void setCachePrewarmedState(boolean state) { isCachePrewarmed.set(state); @@ -830,6 +856,10 @@ public class CachedStore implements RawStore, Configurable { Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); if (tbl == null) { // This table is not yet loaded in cache + // If the prewarm thread is working on this table's database, + // let's move this table to the top of tblNamesBeingPrewarmed stack, + // so that it gets loaded to the cache faster and is available for subsequent requests + tblsPendingPrewarm.prioritizeTableForPrewarm(tblName); return rawStore.getTable(catName, dbName, tblName); } if (tbl != null) {