Repository: spark
Updated Branches:
  refs/heads/master ab3363e9f -> 3eca283ac


[SPARK-17994][SQL] Add back a file status cache for catalog tables

## What changes were proposed in this pull request?

In SPARK-16980, we removed the full in-memory cache of table partitions in 
favor of loading only needed partitions from the metastore. This greatly 
improves the initial latency of queries that only read a small fraction of 
table partitions.

However, since the metastore does not store file statistics, we need to 
discover those from remote storage. With the loss of the in-memory file status 
cache this has to happen on each query, increasing the latency of repeated 
queries over the same partitions.

The proposal is to add back a per-table cache of partition contents, i.e. 
Map[Path, Array[FileStatus]]. This cache would be retained per-table, and can 
be invalidated through refreshTable() and refreshByPath(). Unlike the prior 
cache, it can be incrementally updated as new partitions are read.

## How was this patch tested?

Existing tests and new tests in `HiveTablePerfStatsSuite`.

cc mallman

Author: Eric Liang <e...@databricks.com>
Author: Michael Allman <mich...@videoamp.com>
Author: Eric Liang <ekhli...@gmail.com>

Closes #15539 from ericl/meta-cache.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3eca283a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3eca283a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3eca283a

Branch: refs/heads/master
Commit: 3eca283aca68ac81c127d60ad5699f854d5f14b7
Parents: ab3363e
Author: Eric Liang <e...@databricks.com>
Authored: Sat Oct 22 22:08:28 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Sat Oct 22 22:08:28 2016 +0800

----------------------------------------------------------------------
 .../spark/metrics/source/StaticSources.scala    |   7 +
 .../execution/datasources/FileStatusCache.scala | 149 ++++++++++++
 .../datasources/ListingFileCatalog.scala        |  13 +-
 .../PartitioningAwareFileCatalog.scala          | 115 +++++----
 .../datasources/TableFileCatalog.scala          |  36 +--
 .../org/apache/spark/sql/internal/SQLConf.scala |  16 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   2 +-
 .../spark/sql/hive/HiveDDLCommandSuite.scala    |  16 +-
 .../spark/sql/hive/HiveDataFrameSuite.scala     | 145 -----------
 .../sql/hive/HiveTablePerfStatsSuite.scala      | 240 +++++++++++++++++++
 10 files changed, 514 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala 
b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
index cf92a10..b54885b 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -81,14 +81,21 @@ object HiveCatalogMetrics extends Source {
   val METRIC_FILES_DISCOVERED = 
metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
 
   /**
+   * Tracks the total number of files served from the file status cache 
instead of discovered.
+   */
+  val METRIC_FILE_CACHE_HITS = 
metricRegistry.counter(MetricRegistry.name("fileCacheHits"))
+
+  /**
    * Resets the values of all metrics to zero. This is useful in tests.
    */
   def reset(): Unit = {
     METRIC_PARTITIONS_FETCHED.dec(METRIC_PARTITIONS_FETCHED.getCount())
     METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
+    METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
   }
 
   // clients can use these to avoid classloader issues with the codahale 
classes
   def incrementFetchedPartitions(n: Int): Unit = 
METRIC_PARTITIONS_FETCHED.inc(n)
   def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
+  def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
new file mode 100644
index 0000000..e0ec748
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.collection.JavaConverters._
+
+import com.google.common.cache._
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.{SerializableConfiguration, SizeEstimator}
+
+/**
+ * A cache of the leaf files of partition directories. We cache these files in 
order to speed
+ * up iterated queries over the same set of partitions. Otherwise, each query 
would have to
+ * hit remote storage in order to gather file statistics for physical planning.
+ *
+ * Each resolved catalog table has its own FileStatusCache. When the backing 
relation for the
+ * table is refreshed via refreshTable() or refreshByPath(), this cache will 
be invalidated.
+ */
+abstract class FileStatusCache {
+  /**
+   * @return the leaf files for the specified path from this cache, or None if 
not cached.
+   */
+  def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+
+  /**
+   * Saves the given set of leaf files for a path in this cache.
+   */
+  def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit
+
+  /**
+   * Invalidates all data held by this cache.
+   */
+  def invalidateAll(): Unit
+}
+
+object FileStatusCache {
+  private var sharedCache: SharedInMemoryCache = null
+
+  /**
+   * @return a new FileStatusCache based on session configuration. Cache 
memory quota is
+   *         shared across all clients.
+   */
+  def newCache(session: SparkSession): FileStatusCache = {
+    synchronized {
+      if (session.sqlContext.conf.filesourcePartitionPruning &&
+          session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
+        if (sharedCache == null) {
+          sharedCache = new SharedInMemoryCache(
+            session.sqlContext.conf.filesourcePartitionFileCacheSize)
+        }
+        sharedCache.getForNewClient()
+      } else {
+        NoopCache
+      }
+    }
+  }
+
+  def resetForTesting(): Unit = synchronized {
+    sharedCache = null
+  }
+}
+
+/**
+ * An implementation that caches partition file statuses in memory.
+ *
+ * @param maxSizeInBytes max allowable cache size before entries start getting 
evicted
+ */
+private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
+  import FileStatusCache._
+
+  // Opaque object that uniquely identifies a shared cache user
+  private type ClientId = Object
+
+  private val warnedAboutEviction = new AtomicBoolean(false)
+
+  // we use a composite cache key in order to distinguish entries inserted by 
different clients
+  private val cache: Cache[(ClientId, Path), Array[FileStatus]] = 
CacheBuilder.newBuilder()
+    .weigher(new Weigher[(ClientId, Path), Array[FileStatus]] {
+      override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int 
= {
+        (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)).toInt
+      }})
+    .removalListener(new RemovalListener[(ClientId, Path), 
Array[FileStatus]]() {
+      override def onRemoval(removed: RemovalNotification[(ClientId, Path), 
Array[FileStatus]]) = {
+        if (removed.getCause() == RemovalCause.SIZE &&
+            warnedAboutEviction.compareAndSet(false, true)) {
+          logWarning(
+            "Evicting cached table partition metadata from memory due to size 
constraints " +
+            "(spark.sql.hive.filesourcePartitionFileCacheSize = " + 
maxSizeInBytes + " bytes). " +
+            "This may impact query planning performance.")
+        }
+      }})
+    .maximumWeight(maxSizeInBytes)
+    .build()
+
+  /**
+   * @return a FileStatusCache that does not share any entries with any other 
client, but does
+   *         share memory resources for the purpose of cache eviction.
+   */
+  def getForNewClient(): FileStatusCache = new FileStatusCache {
+    val clientId = new Object()
+
+    override def getLeafFiles(path: Path): Option[Array[FileStatus]] = {
+      Option(cache.getIfPresent((clientId, path)))
+    }
+
+    override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit 
= {
+      cache.put((clientId, path), leafFiles.toArray)
+    }
+
+    override def invalidateAll(): Unit = {
+      cache.asMap.asScala.foreach { case (key, value) =>
+        if (key._1 == clientId) {
+          cache.invalidate(key)
+        }
+      }
+    }
+  }
+}
+
+/**
+ * A non-caching implementation used when partition file status caching is 
disabled.
+ */
+object NoopCache extends FileStatusCache {
+  override def getLeafFiles(path: Path): Option[Array[FileStatus]] = None
+  override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit = 
{}
+  override def invalidateAll(): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 6d10501..d9d5883 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -38,14 +38,16 @@ class ListingFileCatalog(
     sparkSession: SparkSession,
     override val rootPaths: Seq[Path],
     parameters: Map[String, String],
-    partitionSchema: Option[StructType])
-  extends PartitioningAwareFileCatalog(sparkSession, parameters, 
partitionSchema) {
+    partitionSchema: Option[StructType],
+    fileStatusCache: FileStatusCache = NoopCache)
+  extends PartitioningAwareFileCatalog(
+    sparkSession, parameters, partitionSchema, fileStatusCache) {
 
   @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, 
FileStatus] = _
   @volatile private var cachedLeafDirToChildrenFiles: Map[Path, 
Array[FileStatus]] = _
   @volatile private var cachedPartitionSpec: PartitionSpec = _
 
-  refresh()
+  refresh0()
 
   override def partitionSpec(): PartitionSpec = {
     if (cachedPartitionSpec == null) {
@@ -64,6 +66,11 @@ class ListingFileCatalog(
   }
 
   override def refresh(): Unit = {
+    refresh0()
+    fileStatusCache.invalidateAll()
+  }
+
+  private def refresh0(): Unit = {
     val files = listLeafFiles(rootPaths)
     cachedLeafFiles =
       new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => 
f.getPath -> f)

http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
index 5c8eff7..9b1903c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
-
 /**
  * An abstract class that represents [[FileCatalog]]s that are aware of 
partitioned tables.
  * It provides the necessary methods to parse partition data based on a set of 
files.
@@ -45,7 +44,8 @@ import org.apache.spark.util.SerializableConfiguration
 abstract class PartitioningAwareFileCatalog(
     sparkSession: SparkSession,
     parameters: Map[String, String],
-    partitionSchema: Option[StructType]) extends FileCatalog with Logging {
+    partitionSchema: Option[StructType],
+    fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with 
Logging {
   import PartitioningAwareFileCatalog.BASE_PATH_PARAM
 
   /** Returns the specification of the partitions inferred from the data. */
@@ -238,15 +238,29 @@ abstract class PartitioningAwareFileCatalog(
    * This is publicly visible for testing.
    */
   def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
-    val files =
-      if (paths.length >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
-        PartitioningAwareFileCatalog.listLeafFilesInParallel(paths, 
hadoopConf, sparkSession)
-      } else {
-        PartitioningAwareFileCatalog.listLeafFilesInSerial(paths, hadoopConf)
+    val output = mutable.LinkedHashSet[FileStatus]()
+    val pathsToFetch = mutable.ArrayBuffer[Path]()
+    for (path <- paths) {
+      fileStatusCache.getLeafFiles(path) match {
+        case Some(files) =>
+          HiveCatalogMetrics.incrementFileCacheHits(files.length)
+          output ++= files
+        case None =>
+          pathsToFetch += path
       }
-
-    HiveCatalogMetrics.incrementFilesDiscovered(files.size)
-    mutable.LinkedHashSet(files: _*)
+    }
+    val discovered = if (pathsToFetch.length >=
+        sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+      PartitioningAwareFileCatalog.listLeafFilesInParallel(pathsToFetch, 
hadoopConf, sparkSession)
+    } else {
+      PartitioningAwareFileCatalog.listLeafFilesInSerial(pathsToFetch, 
hadoopConf)
+    }
+    discovered.foreach { case (path, leafFiles) =>
+      HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
+      fileStatusCache.putLeafFiles(path, leafFiles.toArray)
+      output ++= leafFiles
+    }
+    output
   }
 }
 
@@ -276,14 +290,14 @@ object PartitioningAwareFileCatalog extends Logging {
    */
   private def listLeafFilesInSerial(
       paths: Seq[Path],
-      hadoopConf: Configuration): Seq[FileStatus] = {
+      hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
     // Dummy jobconf to get to the pathFilter defined in configuration
     val jobConf = new JobConf(hadoopConf, this.getClass)
     val filter = FileInputFormat.getInputPathFilter(jobConf)
 
-    paths.flatMap { path =>
+    paths.map { path =>
       val fs = path.getFileSystem(hadoopConf)
-      listLeafFiles0(fs, path, filter)
+      (path, listLeafFiles0(fs, path, filter))
     }
   }
 
@@ -294,7 +308,7 @@ object PartitioningAwareFileCatalog extends Logging {
   private def listLeafFilesInParallel(
       paths: Seq[Path],
       hadoopConf: Configuration,
-      sparkSession: SparkSession): Seq[FileStatus] = {
+      sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
     assert(paths.size >= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
     logInfo(s"Listing leaf files and directories in parallel under: 
${paths.mkString(", ")}")
 
@@ -306,47 +320,54 @@ object PartitioningAwareFileCatalog extends Logging {
     // in case of large #defaultParallelism.
     val numParallelism = Math.min(paths.size, 10000)
 
-    val statuses = sparkContext
+    val statusMap = sparkContext
       .parallelize(serializedPaths, numParallelism)
       .mapPartitions { paths =>
         val hadoopConf = serializableConfiguration.value
         listLeafFilesInSerial(paths.map(new Path(_)).toSeq, 
hadoopConf).iterator
-      }.map { status =>
-        // Turn FileStatus into SerializableFileStatus so we can send it back 
to the driver
-        val blockLocations = status match {
-          case f: LocatedFileStatus =>
-            f.getBlockLocations.map { loc =>
-              SerializableBlockLocation(
-                loc.getNames,
-                loc.getHosts,
-                loc.getOffset,
-                loc.getLength)
-            }
-
-          case _ =>
-            Array.empty[SerializableBlockLocation]
-        }
+      }.map { case (path, statuses) =>
+        val serializableStatuses = statuses.map { status =>
+          // Turn FileStatus into SerializableFileStatus so we can send it 
back to the driver
+          val blockLocations = status match {
+            case f: LocatedFileStatus =>
+              f.getBlockLocations.map { loc =>
+                SerializableBlockLocation(
+                  loc.getNames,
+                  loc.getHosts,
+                  loc.getOffset,
+                  loc.getLength)
+              }
+
+            case _ =>
+              Array.empty[SerializableBlockLocation]
+          }
 
-        SerializableFileStatus(
-          status.getPath.toString,
-          status.getLen,
-          status.isDirectory,
-          status.getReplication,
-          status.getBlockSize,
-          status.getModificationTime,
-          status.getAccessTime,
-          blockLocations)
+          SerializableFileStatus(
+            status.getPath.toString,
+            status.getLen,
+            status.isDirectory,
+            status.getReplication,
+            status.getBlockSize,
+            status.getModificationTime,
+            status.getAccessTime,
+            blockLocations)
+        }
+        (path.toString, serializableStatuses)
       }.collect()
 
-    // Turn SerializableFileStatus back to Status
-    statuses.map { f =>
-      val blockLocations = f.blockLocations.map { loc =>
-        new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
+    // turn SerializableFileStatus back to Status
+    statusMap.map { case (path, serializableStatuses) =>
+      val statuses = serializableStatuses.map { f =>
+        val blockLocations = f.blockLocations.map { loc =>
+          new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
+        }
+        new LocatedFileStatus(
+          new FileStatus(
+            f.length, f.isDir, f.blockReplication, f.blockSize, 
f.modificationTime,
+            new Path(f.path)),
+          blockLocations)
       }
-      new LocatedFileStatus(
-        new FileStatus(
-          f.length, f.isDir, f.blockReplication, f.blockSize, 
f.modificationTime, new Path(f.path)),
-        blockLocations)
+      (new Path(path), statuses)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index fc08c37..31a01bc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.types.StructType
  * @param table the table's (unqualified) name
  * @param partitionSchema the schema of a partitioned table's partition columns
  * @param sizeInBytes the table's data size in bytes
+ * @param fileStatusCache optional cache implementation to use for file listing
  */
 class TableFileCatalog(
     sparkSession: SparkSession,
@@ -42,24 +43,21 @@ class TableFileCatalog(
 
   protected val hadoopConf = sparkSession.sessionState.newHadoopConf
 
+  private val fileStatusCache = FileStatusCache.newCache(sparkSession)
+
   private val externalCatalog = sparkSession.sharedState.externalCatalog
 
   private val catalogTable = externalCatalog.getTable(db, table)
 
   private val baseLocation = catalogTable.storage.locationUri
 
-  // Populated on-demand by calls to cachedAllPartitions
-  private var cachedAllPartitions: ListingFileCatalog = null
-
   override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
 
   override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
     filterPartitions(filters).listFiles(Nil)
   }
 
-  override def refresh(): Unit = synchronized {
-    cachedAllPartitions = null
-  }
+  override def refresh(): Unit = fileStatusCache.invalidateAll()
 
   /**
    * Returns a [[ListingFileCatalog]] for this table restricted to the subset 
of partitions
@@ -68,14 +66,6 @@ class TableFileCatalog(
    * @param filters partition-pruning filters
    */
   def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
-    if (filters.isEmpty) {
-      allPartitions
-    } else {
-      filterPartitions0(filters)
-    }
-  }
-
-  private def filterPartitions0(filters: Seq[Expression]): ListingFileCatalog 
= {
     val parameters = baseLocation
       .map(loc => Map(PartitioningAwareFileCatalog.BASE_PATH_PARAM -> loc))
       .getOrElse(Map.empty)
@@ -87,21 +77,13 @@ class TableFileCatalog(
         }
         val partitionSpec = PartitionSpec(schema, partitions)
         new PrunedTableFileCatalog(
-          sparkSession, new Path(baseLocation.get), partitionSpec)
+          sparkSession, new Path(baseLocation.get), fileStatusCache, 
partitionSpec)
       case None =>
-        new ListingFileCatalog(sparkSession, rootPaths, parameters, None)
-    }
-  }
-
-  // Not used in the hot path of queries when metastore partition pruning is 
enabled
-  def allPartitions: ListingFileCatalog = synchronized {
-    if (cachedAllPartitions == null) {
-      cachedAllPartitions = filterPartitions0(Nil)
+        new ListingFileCatalog(sparkSession, rootPaths, parameters, None, 
fileStatusCache)
     }
-    cachedAllPartitions
   }
 
-  override def inputFiles: Array[String] = allPartitions.inputFiles
+  override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
 }
 
 /**
@@ -114,9 +96,11 @@ class TableFileCatalog(
 private class PrunedTableFileCatalog(
     sparkSession: SparkSession,
     tableBasePath: Path,
+    fileStatusCache: FileStatusCache,
     override val partitionSpec: PartitionSpec)
   extends ListingFileCatalog(
     sparkSession,
     partitionSpec.partitions.map(_.path),
     Map.empty,
-    Some(partitionSpec.partitionColumns))
+    Some(partitionSpec.partitionColumns),
+    fileStatusCache)

http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index ebf4fad..a6e2fa2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -265,17 +265,27 @@ object SQLConf {
   val HIVE_METASTORE_PARTITION_PRUNING =
     SQLConfigBuilder("spark.sql.hive.metastorePartitionPruning")
       .doc("When true, some predicates will be pushed down into the Hive 
metastore so that " +
-           "unmatching partitions can be eliminated earlier.")
+           "unmatching partitions can be eliminated earlier. This only affects 
Hive tables " +
+           "not converted to filesource relations (see 
HiveUtils.CONVERT_METASTORE_PARQUET and " +
+           "HiveUtils.CONVERT_METASTORE_ORC for more information).")
       .booleanConf
       .createWithDefault(true)
 
   val HIVE_FILESOURCE_PARTITION_PRUNING =
     SQLConfigBuilder("spark.sql.hive.filesourcePartitionPruning")
-      .doc("When true, enable metastore partition pruning for file source 
tables as well. " +
+      .doc("When true, enable metastore partition pruning for filesource 
relations as well. " +
            "This is currently implemented for converted Hive tables only.")
       .booleanConf
       .createWithDefault(true)
 
+  val HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE =
+    SQLConfigBuilder("spark.sql.hive.filesourcePartitionFileCacheSize")
+      .doc("When nonzero, enable caching of partition file metadata in memory. 
All table share " +
+           "a cache that can use up to specified num bytes for file metadata. 
This conf only " +
+           "applies if filesource partition pruning is also enabled.")
+      .longConf
+      .createWithDefault(250 * 1024 * 1024)
+
   val OPTIMIZER_METADATA_ONLY = 
SQLConfigBuilder("spark.sql.optimizer.metadataOnly")
     .doc("When true, enable the metadata-only query optimization that use the 
table's metadata " +
       "to produce the partition columns instead of table scans. It applies 
when all the columns " +
@@ -670,6 +680,8 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
 
   def filesourcePartitionPruning: Boolean = 
getConf(HIVE_FILESOURCE_PARTITION_PRUNING)
 
+  def filesourcePartitionFileCacheSize: Long = 
getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
+
   def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
 
   def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)

http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c909eb5..4408933 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -235,7 +235,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
           if (lazyPruningEnabled) {
             catalog
           } else {
-            catalog.allPartitions
+            catalog.filterPartitions(Nil)  // materialize all the partitions 
in memory
           }
         }
         val partitionSchemaColumnNames = 
partitionSchema.map(_.name.toLowerCase).toSet

http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 8133749..d13e29b 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -577,5 +577,19 @@ class HiveDDLCommandSuite extends PlanTest with 
SQLTestUtils with TestHiveSingle
       assert(output == 
Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
       assert(serde == 
Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
     }
-   }
+  }
+
+  test("table name with schema") {
+    // regression test for SPARK-11778
+    spark.sql("create schema usrdb")
+    spark.sql("create table usrdb.test(c int)")
+    spark.read.table("usrdb.test")
+    spark.sql("drop table usrdb.test")
+    spark.sql("drop schema usrdb")
+  }
+
+  test("SPARK-15887: hive-site.xml should be loaded") {
+    val hiveClient = 
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+    assert(hiveClient.getConf("hive.in.test", "") == "true")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
deleted file mode 100644
index 1552343..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameSuite.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.File
-
-import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.test.SQLTestUtils
-
-class HiveDataFrameSuite extends QueryTest with TestHiveSingleton with 
SQLTestUtils {
-  test("table name with schema") {
-    // regression test for SPARK-11778
-    spark.sql("create schema usrdb")
-    spark.sql("create table usrdb.test(c int)")
-    spark.read.table("usrdb.test")
-    spark.sql("drop table usrdb.test")
-    spark.sql("drop schema usrdb")
-  }
-
-  test("SPARK-15887: hive-site.xml should be loaded") {
-    val hiveClient = 
spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
-    assert(hiveClient.getConf("hive.in.test", "") == "true")
-  }
-
-  private def setupPartitionedTable(tableName: String, dir: File): Unit = {
-    spark.range(5).selectExpr("id", "id as partCol1", "id as partCol2").write
-      .partitionBy("partCol1", "partCol2")
-      .mode("overwrite")
-      .parquet(dir.getAbsolutePath)
-
-    spark.sql(s"""
-      |create external table $tableName (id long)
-      |partitioned by (partCol1 int, partCol2 int)
-      |stored as parquet
-      |location "${dir.getAbsolutePath}"""".stripMargin)
-    spark.sql(s"msck repair table $tableName")
-  }
-
-  test("partitioned pruned table reports only selected files") {
-    assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) 
== "true")
-    withTable("test") {
-      withTempDir { dir =>
-        setupPartitionedTable("test", dir)
-        val df = spark.sql("select * from test")
-        assert(df.count() == 5)
-        assert(df.inputFiles.length == 5)  // unpruned
-
-        val df2 = spark.sql("select * from test where partCol1 = 3 or partCol2 
= 4")
-        assert(df2.count() == 2)
-        assert(df2.inputFiles.length == 2)  // pruned, so we have less files
-
-        val df3 = spark.sql("select * from test where PARTCOL1 = 3 or partcol2 
= 4")
-        assert(df3.count() == 2)
-        assert(df3.inputFiles.length == 2)
-
-        val df4 = spark.sql("select * from test where partCol1 = 999")
-        assert(df4.count() == 0)
-        assert(df4.inputFiles.length == 0)
-      }
-    }
-  }
-
-  test("lazy partition pruning reads only necessary partition data") {
-    withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true") {
-      withTable("test") {
-        withTempDir { dir =>
-          setupPartitionedTable("test", dir)
-          HiveCatalogMetrics.reset()
-          spark.sql("select * from test where partCol1 = 999").count()
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
-
-          HiveCatalogMetrics.reset()
-          spark.sql("select * from test where partCol1 < 2").count()
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
-
-          HiveCatalogMetrics.reset()
-          spark.sql("select * from test where partCol1 < 3").count()
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 3)
-
-          // should read all
-          HiveCatalogMetrics.reset()
-          spark.sql("select * from test").count()
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-
-          // read all should be cached
-          HiveCatalogMetrics.reset()
-          spark.sql("select * from test").count()
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
-        }
-      }
-    }
-  }
-
-  test("all partitions read and cached when filesource partition pruning is 
off") {
-    withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") {
-      withTable("test") {
-        withTempDir { dir =>
-          setupPartitionedTable("test", dir)
-
-          // We actually query the partitions from hive each time the table is 
resolved in this
-          // mode. This is kind of terrible, but is needed to preserve the 
legacy behavior
-          // of doing plan cache validation based on the entire partition set.
-          HiveCatalogMetrics.reset()
-          spark.sql("select * from test where partCol1 = 999").count()
-          // 5 from table resolution, another 5 from ListingFileCatalog
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
-
-          HiveCatalogMetrics.reset()
-          spark.sql("select * from test where partCol1 < 2").count()
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
-
-          HiveCatalogMetrics.reset()
-          spark.sql("select * from test").count()
-          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
-          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/3eca283a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
new file mode 100644
index 0000000..82ee813
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveTablePerfStatsSuite.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.execution.datasources.FileStatusCache
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+class HiveTablePerfStatsSuite
+  extends QueryTest with TestHiveSingleton with SQLTestUtils with 
BeforeAndAfterEach {
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    FileStatusCache.resetForTesting()
+  }
+
+  override def afterEach(): Unit = {
+    super.afterEach()
+    FileStatusCache.resetForTesting()
+  }
+
+  private def setupPartitionedTable(tableName: String, dir: File): Unit = {
+    spark.range(5).selectExpr("id", "id as partCol1", "id as partCol2").write
+      .partitionBy("partCol1", "partCol2")
+      .mode("overwrite")
+      .parquet(dir.getAbsolutePath)
+
+    spark.sql(s"""
+      |create external table $tableName (id long)
+      |partitioned by (partCol1 int, partCol2 int)
+      |stored as parquet
+      |location "${dir.getAbsolutePath}"""".stripMargin)
+    spark.sql(s"msck repair table $tableName")
+  }
+
+  test("partitioned pruned table reports only selected files") {
+    assert(spark.sqlContext.getConf(HiveUtils.CONVERT_METASTORE_PARQUET.key) 
== "true")
+    withTable("test") {
+      withTempDir { dir =>
+        setupPartitionedTable("test", dir)
+        val df = spark.sql("select * from test")
+        assert(df.count() == 5)
+        assert(df.inputFiles.length == 5)  // unpruned
+
+        val df2 = spark.sql("select * from test where partCol1 = 3 or partCol2 
= 4")
+        assert(df2.count() == 2)
+        assert(df2.inputFiles.length == 2)  // pruned, so we have less files
+
+        val df3 = spark.sql("select * from test where PARTCOL1 = 3 or partcol2 
= 4")
+        assert(df3.count() == 2)
+        assert(df3.inputFiles.length == 2)
+
+        val df4 = spark.sql("select * from test where partCol1 = 999")
+        assert(df4.count() == 0)
+        assert(df4.inputFiles.length == 0)
+      }
+    }
+  }
+
+  test("lazy partition pruning reads only necessary partition data") {
+    withSQLConf(
+        SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "true",
+        SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key -> "0") {
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedTable("test", dir)
+          HiveCatalogMetrics.reset()
+          spark.sql("select * from test where partCol1 = 999").count()
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+
+          HiveCatalogMetrics.reset()
+          spark.sql("select * from test where partCol1 < 2").count()
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
+
+          HiveCatalogMetrics.reset()
+          spark.sql("select * from test where partCol1 < 3").count()
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 3)
+
+          // should read all
+          HiveCatalogMetrics.reset()
+          spark.sql("select * from test").count()
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+
+          // read all should not be cached
+          HiveCatalogMetrics.reset()
+          spark.sql("select * from test").count()
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+
+          // cache should be disabled
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+        }
+      }
+    }
+  }
+
+  test("lazy partition pruning with file status caching enabled") {
+    withSQLConf(
+        "spark.sql.hive.filesourcePartitionPruning" -> "true",
+        "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedTable("test", dir)
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test where partCol1 = 999").count() 
== 0)
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 0)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test where partCol1 < 2").count() == 
2)
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 2)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test where partCol1 < 3").count() == 
3)
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 3)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 2)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test").count() == 5)
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 2)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 3)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test").count() == 5)
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 5)
+        }
+      }
+    }
+  }
+
+  test("file status caching respects refresh table and refreshByPath") {
+    withSQLConf(
+        "spark.sql.hive.filesourcePartitionPruning" -> "true",
+        "spark.sql.hive.filesourcePartitionFileCacheSize" -> "9999999") {
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedTable("test", dir)
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test").count() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+          HiveCatalogMetrics.reset()
+          spark.sql("refresh table test")
+          assert(spark.sql("select * from test").count() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+
+          spark.catalog.cacheTable("test")
+          HiveCatalogMetrics.reset()
+          spark.catalog.refreshByPath(dir.getAbsolutePath)
+          assert(spark.sql("select * from test").count() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+        }
+      }
+    }
+  }
+
+  test("file status cache respects size limit") {
+    withSQLConf(
+        "spark.sql.hive.filesourcePartitionPruning" -> "true",
+        "spark.sql.hive.filesourcePartitionFileCacheSize" -> "1" /* 1 byte */) 
{
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedTable("test", dir)
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test").count() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+          assert(spark.sql("select * from test").count() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 10)
+          assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+        }
+      }
+    }
+  }
+
+  test("all partitions read and cached when filesource partition pruning is 
off") {
+    withSQLConf(SQLConf.HIVE_FILESOURCE_PARTITION_PRUNING.key -> "false") {
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedTable("test", dir)
+
+          // We actually query the partitions from hive each time the table is 
resolved in this
+          // mode. This is kind of terrible, but is needed to preserve the 
legacy behavior
+          // of doing plan cache validation based on the entire partition set.
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test where partCol1 = 999").count() 
== 0)
+          // 5 from table resolution, another 5 from ListingFileCatalog
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test where partCol1 < 2").count() == 
2)
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+
+          HiveCatalogMetrics.reset()
+          assert(spark.sql("select * from test").count() == 5)
+          assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 5)
+          assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to