This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 31488e1 [SPARK-27504][SQL] File source V2: support refreshing metadata cache 31488e1 is described below commit 31488e1ca506efd34459e6bc9a08b6d0956c8d44 Author: Gengliang Wang <gengliang.w...@databricks.com> AuthorDate: Fri Apr 19 18:26:03 2019 +0800 [SPARK-27504][SQL] File source V2: support refreshing metadata cache ## What changes were proposed in this pull request? In file source V1, if some file is deleted manually, reading the DataFrame/Table will throws an exception with suggestion message ``` It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved. ``` After refreshing the table/DataFrame, the reads should return correct results. We should follow it in file source V2 as well. ## How was this patch tested? Unit test Closes #24401 from gengliangwang/refreshFileTable. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../datasources/v2/DataSourceV2Relation.scala | 5 +++ .../datasources/v2/FilePartitionReader.scala | 38 ++++++++++++---------- .../org/apache/spark/sql/MetadataCacheSuite.scala | 35 ++++++++++++++------ 3 files changed, 50 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 4119957..e7e0be0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -66,6 +66,11 @@ case class DataSourceV2Relation( override def newInstance(): DataSourceV2Relation = { copy(output = output.map(_.newInstance())) } + + override def refresh(): Unit = table match { + case table: FileTable => table.fileIndex.refresh() + case _ => // Do nothing. + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala index 7c7b468..d4bad29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala @@ -34,25 +34,27 @@ class FilePartitionReader[T](readers: Iterator[PartitionedFileReader[T]]) override def next(): Boolean = { if (currentReader == null) { if (readers.hasNext) { - if (ignoreMissingFiles || ignoreCorruptFiles) { - try { - currentReader = getNextReader() - } catch { - case e: FileNotFoundException if ignoreMissingFiles => - logWarning(s"Skipped missing file: $currentReader", e) - currentReader = null - return false - // Throw FileNotFoundException even if `ignoreCorruptFiles` is true - case e: FileNotFoundException if !ignoreMissingFiles => throw e - case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => - logWarning( - s"Skipped the rest of the content in the corrupted file: $currentReader", e) - currentReader = null - InputFileBlockHolder.unset() - return false - } - } else { + try { currentReader = getNextReader() + } catch { + case e: FileNotFoundException if ignoreMissingFiles => + logWarning(s"Skipped missing file: $currentReader", e) + currentReader = null + return false + // Throw FileNotFoundException even if `ignoreCorruptFiles` is true + case e: FileNotFoundException if !ignoreMissingFiles => + throw new FileNotFoundException( + e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by recreating the Dataset/DataFrame involved.") + case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => + logWarning( + s"Skipped the rest of the content in the corrupted file: $currentReader", e) + currentReader = null + InputFileBlockHolder.unset() + return false } } else { return false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala index 98aa447..664d59c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala @@ -19,14 +19,14 @@ package org.apache.spark.sql import java.io.File -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext /** * Test suite to handle metadata cache related. */ -class MetadataCacheSuite extends QueryTest with SharedSQLContext { +abstract class MetadataCacheSuite extends QueryTest with SharedSQLContext { /** Removes one data file in the given directory. */ private def deleteOneFileInDirectory(dir: File): Unit = { @@ -38,14 +38,15 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext { oneFile.foreach(_.delete()) } - test("SPARK-16336 Suggest doing table refresh when encountering FileNotFoundException") { + test("SPARK-16336,SPARK-27504 Suggest doing table refresh " + + "when encountering FileNotFoundException") { withTempPath { (location: File) => // Create a Parquet directory spark.range(start = 0, end = 100, step = 1, numPartitions = 3) - .write.parquet(location.getAbsolutePath) + .write.orc(location.getAbsolutePath) // Read the directory in - val df = spark.read.parquet(location.getAbsolutePath) + val df = spark.read.orc(location.getAbsolutePath) assert(df.count() == 100) // Delete a file @@ -60,14 +61,14 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext { } } - test("SPARK-16337 temporary view refresh") { + test("SPARK-16337,SPARK-27504 temporary view refresh") { withTempView("view_refresh") { withTempPath { (location: File) => // Create a Parquet directory spark.range(start = 0, end = 100, step = 1, numPartitions = 3) - .write.parquet(location.getAbsolutePath) + .write.orc(location.getAbsolutePath) // Read the directory in - spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh") + spark.read.orc(location.getAbsolutePath).createOrReplaceTempView("view_refresh") assert(sql("select count(*) from view_refresh").first().getLong(0) == 100) // Delete a file @@ -93,10 +94,10 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext { withTempPath { (location: File) => // Create a Parquet directory spark.range(start = 0, end = 100, step = 1, numPartitions = 3) - .write.parquet(location.getAbsolutePath) + .write.orc(location.getAbsolutePath) // Read the directory in - spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh") + spark.read.orc(location.getAbsolutePath).createOrReplaceTempView("view_refresh") // Delete a file deleteOneFileInDirectory(location) @@ -111,3 +112,17 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext { } } } + +class MetadataCacheV1Suite extends MetadataCacheSuite { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc") +} + +class MetadataCacheV2Suite extends MetadataCacheSuite { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_READER_LIST, "") +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org