Repository: spark
Updated Branches:
  refs/heads/master a83ae0d9b -> 4dfd746de


[SPARK-23896][SQL] Improve PartitioningAwareFileIndex

## What changes were proposed in this pull request?

Currently `PartitioningAwareFileIndex` accepts an optional parameter 
`userPartitionSchema`. If provided, it will combine the inferred partition 
schema with the parameter.

However,
1. to get `userPartitionSchema`, we need to  combine inferred partition schema 
with `userSpecifiedSchema`
2. to get the inferred partition schema, we have to create a temporary file 
index.

Only after that, a final version of `PartitioningAwareFileIndex` can be created.

This can be improved by passing `userSpecifiedSchema` to 
`PartitioningAwareFileIndex`.

With the improvement, we can reduce redundant code and avoid parsing the file 
partition twice.
## How was this patch tested?
Unit test

Author: Gengliang Wang <gengliang.w...@databricks.com>

Closes #21004 from gengliangwang/PartitioningAwareFileIndex.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4dfd746d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4dfd746d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4dfd746d

Branch: refs/heads/master
Commit: 4dfd746de3f4346ed0c2191f8523a7e6cc9f064d
Parents: a83ae0d
Author: Gengliang Wang <gengliang.w...@databricks.com>
Authored: Sat Apr 14 00:22:38 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Sat Apr 14 00:22:38 2018 +0800

----------------------------------------------------------------------
 .../datasources/CatalogFileIndex.scala          |   2 +-
 .../sql/execution/datasources/DataSource.scala  | 133 ++++++++-----------
 .../datasources/InMemoryFileIndex.scala         |   8 +-
 .../PartitioningAwareFileIndex.scala            |  54 +++++---
 .../streaming/MetadataLogFileIndex.scala        |  10 +-
 .../datasources/FileSourceStrategySuite.scala   |   2 +-
 .../hive/PartitionedTablePerfStatsSuite.scala   |   2 +-
 7 files changed, 103 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 4046396..a66a076 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -85,7 +85,7 @@ class CatalogFileIndex(
         sparkSession, new Path(baseLocation.get), fileStatusCache, 
partitionSpec, Option(timeNs))
     } else {
       new InMemoryFileIndex(
-        sparkSession, rootPaths, table.storage.properties, partitionSchema = 
None)
+        sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema 
= None)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index b84ea76..f16d824 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
 import scala.language.{existentials, implicitConversions}
 import scala.util.{Failure, Success, Try}
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -104,24 +103,6 @@ case class DataSource(
   }
 
   /**
-   * In the read path, only managed tables by Hive provide the partition 
columns properly when
-   * initializing this class. All other file based data sources will try to 
infer the partitioning,
-   * and then cast the inferred types to user specified dataTypes if the 
partition columns exist
-   * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs 
like SPARK-18510, or
-   * inconsistent data types as reported in SPARK-21463.
-   * @param fileIndex A FileIndex that will perform partition inference
-   * @return The PartitionSchema resolved from inference and cast according to 
`userSpecifiedSchema`
-   */
-  private def combineInferredAndUserSpecifiedPartitionSchema(fileIndex: 
FileIndex): StructType = {
-    val resolved = fileIndex.partitionSchema.map { partitionField =>
-      // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise 
fallback to inferred
-      userSpecifiedSchema.flatMap(_.find(f => equality(f.name, 
partitionField.name))).getOrElse(
-        partitionField)
-    }
-    StructType(resolved)
-  }
-
-  /**
    * Get the schema of the given FileFormat, if provided by 
`userSpecifiedSchema`, or try to infer
    * it. In the read path, only managed tables by Hive provide the partition 
columns properly when
    * initializing this class. All other file based data sources will try to 
infer the partitioning,
@@ -140,31 +121,26 @@ case class DataSource(
    *     be any further inference in any triggers.
    *
    * @param format the file format object for this DataSource
-   * @param fileStatusCache the shared cache for file statuses to speed up 
listing
+   * @param fileIndex optional [[InMemoryFileIndex]] for getting partition 
schema and file list
    * @return A pair of the data schema (excluding partition columns) and the 
schema of the partition
    *         columns.
    */
   private def getOrInferFileFormatSchema(
       format: FileFormat,
-      fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) 
= {
-    // the operations below are expensive therefore try not to do them if we 
don't need to, e.g.,
+      fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = 
{
+    // The operations below are expensive therefore try not to do them if we 
don't need to, e.g.,
     // in streaming mode, we have already inferred and registered partition 
columns, we will
     // never have to materialize the lazy val below
-    lazy val tempFileIndex = {
-      val allPaths = caseInsensitiveOptions.get("path") ++ paths
-      val hadoopConf = sparkSession.sessionState.newHadoopConf()
-      val globbedPaths = allPaths.toSeq.flatMap { path =>
-        val hdfsPath = new Path(path)
-        val fs = hdfsPath.getFileSystem(hadoopConf)
-        val qualified = hdfsPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
-        SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
-      }.toArray
-      new InMemoryFileIndex(sparkSession, globbedPaths, options, None, 
fileStatusCache)
+    lazy val tempFileIndex = fileIndex.getOrElse {
+      val globbedPaths =
+        checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, 
checkFilesExist = false)
+      createInMemoryFileIndex(globbedPaths)
     }
+
     val partitionSchema = if (partitionColumns.isEmpty) {
       // Try to infer partitioning, because no DataSource in the read path 
provides the partitioning
       // columns properly unless it is a Hive DataSource
-      combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex)
+      tempFileIndex.partitionSchema
     } else {
       // maintain old behavior before SPARK-18510. If userSpecifiedSchema is 
empty used inferred
       // partitioning
@@ -356,13 +332,7 @@ case class DataSource(
             caseInsensitiveOptions.get("path").toSeq ++ paths,
             sparkSession.sessionState.newHadoopConf()) =>
         val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ 
paths).head)
-        val tempFileCatalog = new MetadataLogFileIndex(sparkSession, basePath, 
None)
-        val fileCatalog = if (userSpecifiedSchema.nonEmpty) {
-          val partitionSchema = 
combineInferredAndUserSpecifiedPartitionSchema(tempFileCatalog)
-          new MetadataLogFileIndex(sparkSession, basePath, 
Option(partitionSchema))
-        } else {
-          tempFileCatalog
-        }
+        val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, 
userSpecifiedSchema)
         val dataSchema = userSpecifiedSchema.orElse {
           format.inferSchema(
             sparkSession,
@@ -384,24 +354,23 @@ case class DataSource(
 
       // This is a non-streaming file based datasource.
       case (format: FileFormat, _) =>
-        val allPaths = caseInsensitiveOptions.get("path") ++ paths
-        val hadoopConf = sparkSession.sessionState.newHadoopConf()
-        val globbedPaths = allPaths.flatMap(
-          DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, 
checkFilesExist)).toArray
-
-        val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
-        val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, 
fileStatusCache)
-
-        val fileCatalog = if 
(sparkSession.sqlContext.conf.manageFilesourcePartitions &&
-            catalogTable.isDefined && 
catalogTable.get.tracksPartitionsInCatalog) {
+        val globbedPaths =
+          checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, 
checkFilesExist = checkFilesExist)
+        val useCatalogFileIndex = 
sparkSession.sqlContext.conf.manageFilesourcePartitions &&
+          catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog 
&&
+          catalogTable.get.partitionColumnNames.nonEmpty
+        val (fileCatalog, dataSchema, partitionSchema) = if 
(useCatalogFileIndex) {
           val defaultTableSize = 
sparkSession.sessionState.conf.defaultSizeInBytes
-          new CatalogFileIndex(
+          val index = new CatalogFileIndex(
             sparkSession,
             catalogTable.get,
             
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
+          (index, catalogTable.get.dataSchema, 
catalogTable.get.partitionSchema)
         } else {
-          new InMemoryFileIndex(
-            sparkSession, globbedPaths, options, Some(partitionSchema), 
fileStatusCache)
+          val index = createInMemoryFileIndex(globbedPaths)
+          val (resultDataSchema, resultPartitionSchema) =
+            getOrInferFileFormatSchema(format, Some(index))
+          (index, resultDataSchema, resultPartitionSchema)
         }
 
         HadoopFsRelation(
@@ -552,6 +521,40 @@ case class DataSource(
         sys.error(s"${providingClass.getCanonicalName} does not allow create 
table as select.")
     }
   }
+
+  /** Returns an [[InMemoryFileIndex]] that can be used to get partition 
schema and file list. */
+  private def createInMemoryFileIndex(globbedPaths: Seq[Path]): 
InMemoryFileIndex = {
+    val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+    new InMemoryFileIndex(
+      sparkSession, globbedPaths, options, userSpecifiedSchema, 
fileStatusCache)
+  }
+
+  /**
+   * Checks and returns files in all the paths.
+   */
+  private def checkAndGlobPathIfNecessary(
+      checkEmptyGlobPath: Boolean,
+      checkFilesExist: Boolean): Seq[Path] = {
+    val allPaths = caseInsensitiveOptions.get("path") ++ paths
+    val hadoopConf = sparkSession.sessionState.newHadoopConf()
+    allPaths.flatMap { path =>
+      val hdfsPath = new Path(path)
+      val fs = hdfsPath.getFileSystem(hadoopConf)
+      val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+      val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+
+      if (checkEmptyGlobPath && globPath.isEmpty) {
+        throw new AnalysisException(s"Path does not exist: $qualified")
+      }
+
+      // Sufficient to check head of the globPath seq for non-glob scenario
+      // Don't need to check once again if files exist in streaming mode
+      if (checkFilesExist && !fs.exists(globPath.head)) {
+        throw new AnalysisException(s"Path does not exist: ${globPath.head}")
+      }
+      globPath
+    }.toSeq
+  }
 }
 
 object DataSource extends Logging {
@@ -700,30 +703,6 @@ object DataSource extends Logging {
   }
 
   /**
-   * If `path` is a file pattern, return all the files that match it. 
Otherwise, return itself.
-   * If `checkFilesExist` is `true`, also check the file existence.
-   */
-  private def checkAndGlobPathIfNecessary(
-      hadoopConf: Configuration,
-      path: String,
-      checkFilesExist: Boolean): Seq[Path] = {
-    val hdfsPath = new Path(path)
-    val fs = hdfsPath.getFileSystem(hadoopConf)
-    val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-    val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
-
-    if (globPath.isEmpty) {
-      throw new AnalysisException(s"Path does not exist: $qualified")
-    }
-    // Sufficient to check head of the globPath seq for non-glob scenario
-    // Don't need to check once again if files exist in streaming mode
-    if (checkFilesExist && !fs.exists(globPath.head)) {
-      throw new AnalysisException(s"Path does not exist: ${globPath.head}")
-    }
-    globPath
-  }
-
-  /**
    * Called before writing into a FileFormat based data source to make sure the
    * supplied schema is not empty.
    * @param schema

http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 318ada0..739d1f4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -41,17 +41,17 @@ import org.apache.spark.util.SerializableConfiguration
  * @param rootPathsSpecified the list of root table paths to scan (some of 
which might be
  *                           filtered out later)
  * @param parameters as set of options to control discovery
- * @param partitionSchema an optional partition schema that will be use to 
provide types for the
- *                        discovered partitions
+ * @param userSpecifiedSchema an optional user specified schema that will be 
use to provide
+ *                            types for the discovered partitions
  */
 class InMemoryFileIndex(
     sparkSession: SparkSession,
     rootPathsSpecified: Seq[Path],
     parameters: Map[String, String],
-    partitionSchema: Option[StructType],
+    userSpecifiedSchema: Option[StructType],
     fileStatusCache: FileStatusCache = NoopCache)
   extends PartitioningAwareFileIndex(
-    sparkSession, parameters, partitionSchema, fileStatusCache) {
+    sparkSession, parameters, userSpecifiedSchema, fileStatusCache) {
 
   // Filter out streaming metadata dirs or files such as 
"/.../_spark_metadata" (the metadata dir)
   // or "/.../_spark_metadata/0" (a file in the metadata dir). 
`rootPathsSpecified` might contain

http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 6b6f638..cc8af7b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -34,13 +34,13 @@ import org.apache.spark.sql.types.{StringType, StructType}
  * It provides the necessary methods to parse partition data based on a set of 
files.
  *
  * @param parameters as set of options to control partition discovery
- * @param userPartitionSchema an optional partition schema that will be use to 
provide types for
- *                            the discovered partitions
+ * @param userSpecifiedSchema an optional user specified schema that will be 
use to provide
+ *                            types for the discovered partitions
  */
 abstract class PartitioningAwareFileIndex(
     sparkSession: SparkSession,
     parameters: Map[String, String],
-    userPartitionSchema: Option[StructType],
+    userSpecifiedSchema: Option[StructType],
     fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with 
Logging {
   import PartitioningAwareFileIndex.BASE_PATH_PARAM
 
@@ -126,35 +126,32 @@ abstract class PartitioningAwareFileIndex(
     val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
     val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
       .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)
-
-    userPartitionSchema match {
+    val inferredPartitionSpec = PartitioningUtils.parsePartitions(
+      leafDirs,
+      typeInference = 
sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
+      basePaths = basePaths,
+      timeZoneId = timeZoneId)
+    userSpecifiedSchema match {
       case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
-        val spec = PartitioningUtils.parsePartitions(
-          leafDirs,
-          typeInference = false,
-          basePaths = basePaths,
-          timeZoneId = timeZoneId)
+        val userPartitionSchema =
+          combineInferredAndUserSpecifiedPartitionSchema(inferredPartitionSpec)
 
-        // Without auto inference, all of value in the `row` should be null or 
in StringType,
         // we need to cast into the data type that user specified.
         def castPartitionValuesToUserSchema(row: InternalRow) = {
           InternalRow((0 until row.numFields).map { i =>
+            val dt = inferredPartitionSpec.partitionColumns.fields(i).dataType
             Cast(
-              Literal.create(row.getUTF8String(i), StringType),
-              userProvidedSchema.fields(i).dataType,
+              Literal.create(row.get(i, dt), dt),
+              userPartitionSchema.fields(i).dataType,
               Option(timeZoneId)).eval()
           }: _*)
         }
 
-        PartitionSpec(userProvidedSchema, spec.partitions.map { part =>
+        PartitionSpec(userPartitionSchema, 
inferredPartitionSpec.partitions.map { part =>
           part.copy(values = castPartitionValuesToUserSchema(part.values))
         })
       case _ =>
-        PartitioningUtils.parsePartitions(
-          leafDirs,
-          typeInference = 
sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
-          basePaths = basePaths,
-          timeZoneId = timeZoneId)
+        inferredPartitionSpec
     }
   }
 
@@ -236,6 +233,25 @@ abstract class PartitioningAwareFileIndex(
     val name = path.getName
     !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
   }
+
+  /**
+   * In the read path, only managed tables by Hive provide the partition 
columns properly when
+   * initializing this class. All other file based data sources will try to 
infer the partitioning,
+   * and then cast the inferred types to user specified dataTypes if the 
partition columns exist
+   * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs 
like SPARK-18510, or
+   * inconsistent data types as reported in SPARK-21463.
+   * @param spec A partition inference result
+   * @return The PartitionSchema resolved from inference and cast according to 
`userSpecifiedSchema`
+   */
+  private def combineInferredAndUserSpecifiedPartitionSchema(spec: 
PartitionSpec): StructType = {
+    val equality = sparkSession.sessionState.conf.resolver
+    val resolved = spec.partitionColumns.map { partitionField =>
+      // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise 
fallback to inferred
+      userSpecifiedSchema.flatMap(_.find(f => equality(f.name, 
partitionField.name))).getOrElse(
+        partitionField)
+    }
+    StructType(resolved)
+  }
 }
 
 object PartitioningAwareFileIndex {

http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
index 1da703c..5cacdd0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
@@ -30,14 +30,14 @@ import org.apache.spark.sql.types.StructType
  * A [[FileIndex]] that generates the list of files to processing by reading 
them from the
  * metadata log files generated by the [[FileStreamSink]].
  *
- * @param userPartitionSchema an optional partition schema that will be use to 
provide types for
- *                            the discovered partitions
+ * @param userSpecifiedSchema an optional user specified schema that will be 
use to provide
+ *                            types for the discovered partitions
  */
 class MetadataLogFileIndex(
     sparkSession: SparkSession,
     path: Path,
-    userPartitionSchema: Option[StructType])
-  extends PartitioningAwareFileIndex(sparkSession, Map.empty, 
userPartitionSchema) {
+    userSpecifiedSchema: Option[StructType])
+  extends PartitioningAwareFileIndex(sparkSession, Map.empty, 
userSpecifiedSchema) {
 
   private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
   logInfo(s"Reading streaming file log from $metadataDirectory")
@@ -51,7 +51,7 @@ class MetadataLogFileIndex(
   }
 
   override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] 
= {
-    allFilesFromLog.toArray.groupBy(_.getPath.getParent)
+    allFilesFromLog.groupBy(_.getPath.getParent)
   }
 
   override def rootPaths: Seq[Path] = path :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index c1d61b8..8764f0c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -401,7 +401,7 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
           sparkSession = spark,
           rootPathsSpecified = Seq(new Path(tempDir)),
           parameters = Map.empty[String, String],
-          partitionSchema = None)
+          userSpecifiedSchema = None)
         // This should not fail.
         fileCatalog.listLeafFiles(Seq(new Path(tempDir)))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 1a86c60..3af163a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -419,7 +419,7 @@ class PartitionedTablePerfStatsSuite
       HiveCatalogMetrics.reset()
       spark.read.load(dir.getAbsolutePath)
       assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 1)
-      assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 1)
+      assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to