Repository: spark Updated Branches: refs/heads/master ab3363e9f -> 3eca283ac
[SPARK-17994][SQL] Add back a file status cache for catalog tables ## What changes were proposed in this pull request? In SPARK-16980, we removed the full in-memory cache of table partitions in favor of loading only needed partitions from the metastore. This greatly improves the initial latency of queries that only read a small fraction of table partitions. However, since the metastore does not store file statistics, we need to discover those from remote storage. With the loss of the in-memory file status cache this has to happen on each query, increasing the latency of repeated queries over the same partitions. The proposal is to add back a per-table cache of partition contents, i.e. Map[Path, Array[FileStatus]]. This cache would be retained per-table, and can be invalidated through refreshTable() and refreshByPath(). Unlike the prior cache, it can be incrementally updated as new partitions are read. ## How was this patch tested? Existing tests and new tests in `HiveTablePerfStatsSuite`. cc mallman Author: Eric Liang <e...@databricks.com> Author: Michael Allman <mich...@videoamp.com> Author: Eric Liang <ekhli...@gmail.com> Closes #15539 from ericl/meta-cache. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3eca283a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3eca283a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3eca283a Branch: refs/heads/master Commit: 3eca283aca68ac81c127d60ad5699f854d5f14b7 Parents: ab3363e Author: Eric Liang <e...@databricks.com> Authored: Sat Oct 22 22:08:28 2016 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Sat Oct 22 22:08:28 2016 +0800 ---------------------------------------------------------------------- .../spark/metrics/source/StaticSources.scala | 7 + .../execution/datasources/FileStatusCache.scala | 149 ++++++++++++ .../datasources/ListingFileCatalog.scala | 13 +- .../PartitioningAwareFileCatalog.scala | 115 +++++---- .../datasources/TableFileCatalog.scala | 36 +-- .../org/apache/spark/sql/internal/SQLConf.scala | 16 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../spark/sql/hive/HiveDDLCommandSuite.scala | 16 +- .../spark/sql/hive/HiveDataFrameSuite.scala | 145 ----------- .../sql/hive/HiveTablePerfStatsSuite.scala | 240 +++++++++++++++++++ 10 files changed, 514 insertions(+), 225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala index cf92a10..b54885b 100644 --- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala +++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala @@ -81,14 +81,21 @@ object HiveCatalogMetrics extends Source { val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered")) /** + * Tracks the total number of files served from the file status cache instead of discovered. + */ + val METRIC_FILE_CACHE_HITS = metricRegistry.counter(MetricRegistry.name("fileCacheHits")) + + /** * Resets the values of all metrics to zero. This is useful in tests. */ def reset(): Unit = { METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount()) METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount()) + METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount()) } // clients can use these to avoid classloader issues with the codahale classes def incrementFetchedPartitions(n: Int): Unit = METRIC_PARTITIONS_FETCHED.inc(n) def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n) + def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n) } http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala new file mode 100644 index 0000000..e0ec748 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.JavaConverters._ + +import com.google.common.cache._ +import org.apache.hadoop.fs.{FileStatus, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{SerializableConfiguration, SizeEstimator} + +/** + * A cache of the leaf files of partition directories. We cache these files in order to speed + * up iterated queries over the same set of partitions. Otherwise, each query would have to + * hit remote storage in order to gather file statistics for physical planning. + * + * Each resolved catalog table has its own FileStatusCache. When the backing relation for the + * table is refreshed via refreshTable() or refreshByPath(), this cache will be invalidated. + */ +abstract class FileStatusCache { + /** + * @return the leaf files for the specified path from this cache, or None if not cached. + */ + def getLeafFiles(path: Path): Option[Array[FileStatus]] = None + + /** + * Saves the given set of leaf files for a path in this cache. + */ + def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit + + /** + * Invalidates all data held by this cache. + */ + def invalidateAll(): Unit +} + +object FileStatusCache { + private var sharedCache: SharedInMemoryCache = null + + /** + * @return a new FileStatusCache based on session configuration. Cache memory quota is + * shared across all clients. + */ + def newCache(session: SparkSession): FileStatusCache = { + synchronized { + if (session.sqlContext.conf.filesourcePartitionPruning && + session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) { + if (sharedCache == null) { + sharedCache = new SharedInMemoryCache( + session.sqlContext.conf.filesourcePartitionFileCacheSize) + } + sharedCache.getForNewClient() + } else { + NoopCache + } + } + } + + def resetForTesting(): Unit = synchronized { + sharedCache = null + } +} + +/** + * An implementation that caches partition file statuses in memory. + * + * @param maxSizeInBytes max allowable cache size before entries start getting evicted + */ +private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging { + import FileStatusCache._ + + // Opaque object that uniquely identifies a shared cache user + private type ClientId = Object + + private val warnedAboutEviction = new AtomicBoolean(false) + + // we use a composite cache key in order to distinguish entries inserted by different clients + private val cache: Cache[(ClientId, Path), Array[FileStatus]] = CacheBuilder.newBuilder() + .weigher(new Weigher[(ClientId, Path), Array[FileStatus]] { + override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = { + (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)).toInt + }}) + .removalListener(new RemovalListener[(ClientId, Path), Array[FileStatus]]() { + override def onRemoval(removed: RemovalNotification[(ClientId, Path), Array[FileStatus]]) = { + if (removed.getCause() == RemovalCause.SIZE && + warnedAboutEviction.compareAndSet(false, true)) { + logWarning( + "Evicting cached table partition metadata from memory due to size constraints " + + "(spark.sql.hive.filesourcePartitionFileCacheSize = " + maxSizeInBytes + " bytes). " + + "This may impact query planning performance.") + } + }}) + .maximumWeight(maxSizeInBytes) + .build() + + /** + * @return a FileStatusCache that does not share any entries with any other client, but does + * share memory resources for the purpose of cache eviction. + */ + def getForNewClient(): FileStatusCache = new FileStatusCache { + val clientId = new Object() + + override def getLeafFiles(path: Path): Option[Array[FileStatus]] = { + Option(cache.getIfPresent((clientId, path))) + } + + override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit = { + cache.put((clientId, path), leafFiles.toArray) + } + + override def invalidateAll(): Unit = { + cache.asMap.asScala.foreach { case (key, value) => + if (key._1 == clientId) { + cache.invalidate(key) + } + } + } + } +} + +/** + * A non-caching implementation used when partition file status caching is disabled. + */ +object NoopCache extends FileStatusCache { + override def getLeafFiles(path: Path): Option[Array[FileStatus]] = None + override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit = {} + override def invalidateAll(): Unit = {} +} http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index 6d10501..d9d5883 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -38,14 +38,16 @@ class ListingFileCatalog( sparkSession: SparkSession, override val rootPaths: Seq[Path], parameters: Map[String, String], - partitionSchema: Option[StructType]) - extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) { + partitionSchema: Option[StructType], + fileStatusCache: FileStatusCache = NoopCache) + extends PartitioningAwareFileCatalog( + sparkSession, parameters, partitionSchema, fileStatusCache) { @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _ @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _ @volatile private var cachedPartitionSpec: PartitionSpec = _ - refresh() + refresh0() override def partitionSpec(): PartitionSpec = { if (cachedPartitionSpec == null) { @@ -64,6 +66,11 @@ class ListingFileCatalog( } override def refresh(): Unit = { + refresh0() + fileStatusCache.invalidateAll() + } + + private def refresh0(): Unit = { val files = listLeafFiles(rootPaths) cachedLeafFiles = new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index 5c8eff7..9b1903c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration - /** * An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables. * It provides the necessary methods to parse partition data based on a set of files. @@ -45,7 +44,8 @@ import org.apache.spark.util.SerializableConfiguration abstract class PartitioningAwareFileCatalog( sparkSession: SparkSession, parameters: Map[String, String], - partitionSchema: Option[StructType]) extends FileCatalog with Logging { + partitionSchema: Option[StructType], + fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with Logging { import PartitioningAwareFileCatalog.BASE_PATH_PARAM /** Returns the specification of the partitions inferred from the data. */ @@ -238,15 +238,29 @@ abstract class PartitioningAwareFileCatalog( * This is publicly visible for testing. */ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { - val files = - if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - PartitioningAwareFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) - } else { - PartitioningAwareFileCatalog.listLeafFilesInSerial(paths, hadoopConf) + val output = mutable.LinkedHashSet[FileStatus]() + val pathsToFetch = mutable.ArrayBuffer[Path]() + for (path <- paths) { + fileStatusCache.getLeafFiles(path) match { + case Some(files) => + HiveCatalogMetrics.incrementFileCacheHits(files.length) + output ++= files + case None => + pathsToFetch += path } - - HiveCatalogMetrics.incrementFilesDiscovered(files.size) - mutable.LinkedHashSet(files: _*) + } + val discovered = if (pathsToFetch.length >= + sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { + PartitioningAwareFileCatalog.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession) + } else { + PartitioningAwareFileCatalog.listLeafFilesInSerial(pathsToFetch, hadoopConf) + } + discovered.foreach { case (path, leafFiles) => + HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) + fileStatusCache.putLeafFiles(path, leafFiles.toArray) + output ++= leafFiles + } + output } } @@ -276,14 +290,14 @@ object PartitioningAwareFileCatalog extends Logging { */ private def listLeafFilesInSerial( paths: Seq[Path], - hadoopConf: Configuration): Seq[FileStatus] = { + hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = { // Dummy jobconf to get to the pathFilter defined in configuration val jobConf = new JobConf(hadoopConf, this.getClass) val filter = FileInputFormat.getInputPathFilter(jobConf) - paths.flatMap { path => + paths.map { path => val fs = path.getFileSystem(hadoopConf) - listLeafFiles0(fs, path, filter) + (path, listLeafFiles0(fs, path, filter)) } } @@ -294,7 +308,7 @@ object PartitioningAwareFileCatalog extends Logging { private def listLeafFilesInParallel( paths: Seq[Path], hadoopConf: Configuration, - sparkSession: SparkSession): Seq[FileStatus] = { + sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") @@ -306,47 +320,54 @@ object PartitioningAwareFileCatalog extends Logging { // in case of large #defaultParallelism. val numParallelism = Math.min(paths.size, 10000) - val statuses = sparkContext + val statusMap = sparkContext .parallelize(serializedPaths, numParallelism) .mapPartitions { paths => val hadoopConf = serializableConfiguration.value listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator - }.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } + }.map { case (path, statuses) => + val serializableStatuses = statuses.map { status => + // Turn FileStatus into SerializableFileStatus so we can send it back to the driver + val blockLocations = status match { + case f: LocatedFileStatus => + f.getBlockLocations.map { loc => + SerializableBlockLocation( + loc.getNames, + loc.getHosts, + loc.getOffset, + loc.getLength) + } + + case _ => + Array.empty[SerializableBlockLocation] + } - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) + SerializableFileStatus( + status.getPath.toString, + status.getLen, + status.isDirectory, + status.getReplication, + status.getBlockSize, + status.getModificationTime, + status.getAccessTime, + blockLocations) + } + (path.toString, serializableStatuses) }.collect() - // Turn SerializableFileStatus back to Status - statuses.map { f => - val blockLocations = f.blockLocations.map { loc => - new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) + // turn SerializableFileStatus back to Status + statusMap.map { case (path, serializableStatuses) => + val statuses = serializableStatuses.map { f => + val blockLocations = f.blockLocations.map { loc => + new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) + } + new LocatedFileStatus( + new FileStatus( + f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, + new Path(f.path)), + blockLocations) } - new LocatedFileStatus( - new FileStatus( - f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)), - blockLocations) + (new Path(path), statuses) } } http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala index fc08c37..31a01bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.types.StructType * @param table the table's (unqualified) name * @param partitionSchema the schema of a partitioned table's partition columns * @param sizeInBytes the table's data size in bytes + * @param fileStatusCache optional cache implementation to use for file listing */ class TableFileCatalog( sparkSession: SparkSession, @@ -42,24 +43,21 @@ class TableFileCatalog( protected val hadoopConf = sparkSession.sessionState.newHadoopConf + private val fileStatusCache = FileStatusCache.newCache(sparkSession) + private val externalCatalog = sparkSession.sharedState.externalCatalog private val catalogTable = externalCatalog.getTable(db, table) private val baseLocation = catalogTable.storage.locationUri - // Populated on-demand by calls to cachedAllPartitions - private var cachedAllPartitions: ListingFileCatalog = null - override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { filterPartitions(filters).listFiles(Nil) } - override def refresh(): Unit = synchronized { - cachedAllPartitions = null - } + override def refresh(): Unit = fileStatusCache.invalidateAll() /** * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions @@ -68,14 +66,6 @@ class TableFileCatalog( * @param filters partition-pruning filters */ def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = { - if (filters.isEmpty) { - allPartitions - } else { - filterPartitions0(filters) - } - } - - private def filterPartitions0(filters: Seq[Expression]): ListingFileCatalog = { val parameters = baseLocation .map(loc => Map(PartitioningAwareFileCatalog.BASE_PATH_PARAM -> loc)) .getOrElse(Map.empty) @@ -87,21 +77,13 @@ class TableFileCatalog( } val partitionSpec = PartitionSpec(schema, partitions) new PrunedTableFileCatalog( - sparkSession, new Path(baseLocation.get), partitionSpec) + sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec) case None => - new ListingFileCatalog(sparkSession, rootPaths, parameters, None) - } - } - - // Not used in the hot path of queries when metastore partition pruning is enabled - def allPartitions: ListingFileCatalog = synchronized { - if (cachedAllPartitions == null) { - cachedAllPartitions = filterPartitions0(Nil) + new ListingFileCatalog(sparkSession, rootPaths, parameters, None, fileStatusCache) } - cachedAllPartitions } - override def inputFiles: Array[String] = allPartitions.inputFiles + override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles } /** @@ -114,9 +96,11 @@ class TableFileCatalog( private class PrunedTableFileCatalog( sparkSession: SparkSession, tableBasePath: Path, + fileStatusCache: FileStatusCache, override val partitionSpec: PartitionSpec) extends ListingFileCatalog( sparkSession, partitionSpec.partitions.map(_.path), Map.empty, - Some(partitionSpec.partitionColumns)) + Some(partitionSpec.partitionColumns), + fileStatusCache) http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ebf4fad..a6e2fa2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -265,17 +265,27 @@ object SQLConf { val HIVE_METASTORE_PARTITION_PRUNING = SQLConfigBuilder("spark.sql.hive.metastorePartitionPruning") .doc("When true, some predicates will be pushed down into the Hive metastore so that " + - "unmatching partitions can be eliminated earlier.") + "unmatching partitions can be eliminated earlier. This only affects Hive tables " + + "not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and " + + "HiveUtils.CONVERT_METASTORE_ORC for more information).") .booleanConf .createWithDefault(true) val HIVE_FILESOURCE_PARTITION_PRUNING = SQLConfigBuilder("spark.sql.hive.filesourcePartitionPruning") - .doc("When true, enable metastore partition pruning for file source tables as well. " + + .doc("When true, enable metastore partition pruning for filesource relations as well. " + "This is currently implemented for converted Hive tables only.") .booleanConf .createWithDefault(true) + val HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE = + SQLConfigBuilder("spark.sql.hive.filesourcePartitionFileCacheSize") + .doc("When nonzero, enable caching of partition file metadata in memory. All table share " + + "a cache that can use up to specified num bytes for file metadata. This conf only " + + "applies if filesource partition pruning is also enabled.") + .longConf + .createWithDefault(250 * 1024 * 1024) + val OPTIMIZER_METADATA_ONLY = SQLConfigBuilder("spark.sql.optimizer.metadataOnly") .doc("When true, enable the metadata-only query optimization that use the table's metadata " + "to produce the partition columns instead of table scans. It applies when all the columns " + @@ -670,6 +680,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def filesourcePartitionPruning: Boolean = getConf(HIVE_FILESOURCE_PARTITION_PRUNING) + def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE) + def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT) def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY) http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c909eb5..4408933 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -235,7 +235,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log if (lazyPruningEnabled) { catalog } else { - catalog.allPartitions + catalog.filterPartitions(Nil) // materialize all the partitions in memory } } val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index 8133749..d13e29b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -577,5 +577,19 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(output == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) assert(serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) } - } + } + + test("table name with schema") { + // regression test for SPARK-11778 + spark.sql("create schema usrdb") + spark.sql("create table usrdb.test(c int)") + spark.read.table("usrdb.test") + spark.sql("drop table usrdb.test") + spark.sql("drop schema usrdb") + } + + test("SPARK-15887: hive-site.xml should be loaded") { + val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + assert(hiveClient.getConf("hive.in.test", "") == "true") + } } http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala deleted file mode 100644 index 1552343..0000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.io.File - -import org.apache.spark.metrics.source.HiveCatalogMetrics -import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SQLTestUtils - -class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { - test("table name with schema") { - // regression test for SPARK-11778 - spark.sql("create schema usrdb") - spark.sql("create table usrdb.test(c int)") - spark.read.table("usrdb.test") - spark.sql("drop table usrdb.test") - spark.sql("drop schema usrdb") - } - - test("SPARK-15887: hive-site.xml should be loaded") { - val hiveClient = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client - assert(hiveClient.getConf("hive.in.test", "") == "true") - } - - private def setupPartitionedTable(tableName: String, dir: File): Unit = { - spark.range(5).selectExpr("id", "id as partCol1", "id as partCol2").write - .partitionBy("partCol1", "partCol2") - .mode("overwrite") - .parquet(dir.getAbsolutePath) - - spark.sql(s""" - |create external table $tableName (id long) - |partitioned by (partCol1 int, partCol2 int) - |stored as parquet - |location "${dir.getAbsolutePath}"""".stripMargin) - spark.sql(s"msck repair table $tableName") - } - - test("partitioned pruned table reports only selected files") { - assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true") - withTable("test") { - withTempDir { dir => - setupPartitionedTable("test", dir) - val df = spark.sql("select * from test") - assert(df.count() == 5) - assert(df.inputFiles.length == 5) // unpruned - - val df2 = spark.sql("select * from test where partCol1 = 3 or partCol2 = 4") - assert(df2.count() == 2) - assert(df2.inputFiles.length == 2) // pruned, so we have less files - - val df3 = spark.sql("select * from test where PARTCOL1 = 3 or partcol2 = 4") - assert(df3.count() == 2) - assert(df3.inputFiles.length == 2) - - val df4 = spark.sql("select * from test where partCol1 = 999") - assert(df4.count() == 0) - assert(df4.inputFiles.length == 0) - } - } - } - - test("lazy partition pruning reads only necessary partition data") { - withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") { - withTable("test") { - withTempDir { dir => - setupPartitionedTable("test", dir) - HiveCatalogMetrics.reset() - spark.sql("select * from test where partCol1 = 999").count() - assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) - - HiveCatalogMetrics.reset() - spark.sql("select * from test where partCol1 < 2").count() - assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2) - - HiveCatalogMetrics.reset() - spark.sql("select * from test where partCol1 < 3").count() - assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 3) - - // should read all - HiveCatalogMetrics.reset() - spark.sql("select * from test").count() - assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) - - // read all should be cached - HiveCatalogMetrics.reset() - spark.sql("select * from test").count() - assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) - } - } - } - } - - test("all partitions read and cached when filesource partition pruning is off") { - withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") { - withTable("test") { - withTempDir { dir => - setupPartitionedTable("test", dir) - - // We actually query the partitions from hive each time the table is resolved in this - // mode. This is kind of terrible, but is needed to preserve the legacy behavior - // of doing plan cache validation based on the entire partition set. - HiveCatalogMetrics.reset() - spark.sql("select * from test where partCol1 = 999").count() - // 5 from table resolution, another 5 from ListingFileCatalog - assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) - - HiveCatalogMetrics.reset() - spark.sql("select * from test where partCol1 < 2").count() - assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) - - HiveCatalogMetrics.reset() - spark.sql("select * from test").count() - assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) - assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala new file mode 100644 index 0000000..82ee813 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.execution.datasources.FileStatusCache +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +class HiveTablePerfStatsSuite + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach { + + override def beforeEach(): Unit = { + super.beforeEach() + FileStatusCache.resetForTesting() + } + + override def afterEach(): Unit = { + super.afterEach() + FileStatusCache.resetForTesting() + } + + private def setupPartitionedTable(tableName: String, dir: File): Unit = { + spark.range(5).selectExpr("id", "id as partCol1", "id as partCol2").write + .partitionBy("partCol1", "partCol2") + .mode("overwrite") + .parquet(dir.getAbsolutePath) + + spark.sql(s""" + |create external table $tableName (id long) + |partitioned by (partCol1 int, partCol2 int) + |stored as parquet + |location "${dir.getAbsolutePath}"""".stripMargin) + spark.sql(s"msck repair table $tableName") + } + + test("partitioned pruned table reports only selected files") { + assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) == "true") + withTable("test") { + withTempDir { dir => + setupPartitionedTable("test", dir) + val df = spark.sql("select * from test") + assert(df.count() == 5) + assert(df.inputFiles.length == 5) // unpruned + + val df2 = spark.sql("select * from test where partCol1 = 3 or partCol2 = 4") + assert(df2.count() == 2) + assert(df2.inputFiles.length == 2) // pruned, so we have less files + + val df3 = spark.sql("select * from test where PARTCOL1 = 3 or partcol2 = 4") + assert(df3.count() == 2) + assert(df3.inputFiles.length == 2) + + val df4 = spark.sql("select * from test where partCol1 = 999") + assert(df4.count() == 0) + assert(df4.inputFiles.length == 0) + } + } + } + + test("lazy partition pruning reads only necessary partition data") { + withSQLConf( + SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true", + SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "0") { + withTable("test") { + withTempDir { dir => + setupPartitionedTable("test", dir) + HiveCatalogMetrics.reset() + spark.sql("select * from test where partCol1 = 999").count() + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + + HiveCatalogMetrics.reset() + spark.sql("select * from test where partCol1 < 2").count() + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2) + + HiveCatalogMetrics.reset() + spark.sql("select * from test where partCol1 < 3").count() + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 3) + + // should read all + HiveCatalogMetrics.reset() + spark.sql("select * from test").count() + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + + // read all should not be cached + HiveCatalogMetrics.reset() + spark.sql("select * from test").count() + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + + // cache should be disabled + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + } + } + } + } + + test("lazy partition pruning with file status caching enabled") { + withSQLConf( + "spark.sql.hive.filesourcePartitionPruning" -> "true", + "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") { + withTable("test") { + withTempDir { dir => + setupPartitionedTable("test", dir) + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test where partCol1 = 999").count() == 0) + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test where partCol1 < 2").count() == 2) + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test where partCol1 < 3").count() == 3) + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 2) + + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 3) + + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 5) + } + } + } + } + + test("file status caching respects refresh table and refreshByPath") { + withSQLConf( + "spark.sql.hive.filesourcePartitionPruning" -> "true", + "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") { + withTable("test") { + withTempDir { dir => + setupPartitionedTable("test", dir) + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + + HiveCatalogMetrics.reset() + spark.sql("refresh table test") + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + + spark.catalog.cacheTable("test") + HiveCatalogMetrics.reset() + spark.catalog.refreshByPath(dir.getAbsolutePath) + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + } + } + } + } + + test("file status cache respects size limit") { + withSQLConf( + "spark.sql.hive.filesourcePartitionPruning" -> "true", + "spark.sql.hive.filesourcePartitionFileCacheSize" -> "1" /* 1 byte */) { + withTable("test") { + withTempDir { dir => + setupPartitionedTable("test", dir) + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10) + assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0) + } + } + } + } + + test("all partitions read and cached when filesource partition pruning is off") { + withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") { + withTable("test") { + withTempDir { dir => + setupPartitionedTable("test", dir) + + // We actually query the partitions from hive each time the table is resolved in this + // mode. This is kind of terrible, but is needed to preserve the legacy behavior + // of doing plan cache validation based on the entire partition set. + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test where partCol1 = 999").count() == 0) + // 5 from table resolution, another 5 from ListingFileCatalog + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5) + + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test where partCol1 < 2").count() == 2) + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + + HiveCatalogMetrics.reset() + assert(spark.sql("select * from test").count() == 5) + assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5) + assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0) + } + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org