This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 31488e1  [SPARK-27504][SQL] File source V2: support refreshing 
metadata cache
31488e1 is described below

commit 31488e1ca506efd34459e6bc9a08b6d0956c8d44
Author: Gengliang Wang <gengliang.w...@databricks.com>
AuthorDate: Fri Apr 19 18:26:03 2019 +0800

    [SPARK-27504][SQL] File source V2: support refreshing metadata cache
    
    ## What changes were proposed in this pull request?
    
    In file source V1, if some file is deleted manually, reading the 
DataFrame/Table will throws an exception with suggestion message
    ```
    It is possible the underlying files have been updated. You can explicitly 
invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in 
SQL or by recreating the Dataset/DataFrame involved.
    ```
    After refreshing the table/DataFrame, the reads should return correct 
results.
    
    We should follow it in file source V2 as well.
    ## How was this patch tested?
    Unit test
    
    Closes #24401 from gengliangwang/refreshFileTable.
    
    Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../datasources/v2/DataSourceV2Relation.scala      |  5 +++
 .../datasources/v2/FilePartitionReader.scala       | 38 ++++++++++++----------
 .../org/apache/spark/sql/MetadataCacheSuite.scala  | 35 ++++++++++++++------
 3 files changed, 50 insertions(+), 28 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 4119957..e7e0be0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -66,6 +66,11 @@ case class DataSourceV2Relation(
   override def newInstance(): DataSourceV2Relation = {
     copy(output = output.map(_.newInstance()))
   }
+
+  override def refresh(): Unit = table match {
+    case table: FileTable => table.fileIndex.refresh()
+    case _ => // Do nothing.
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
index 7c7b468..d4bad29 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -34,25 +34,27 @@ class FilePartitionReader[T](readers: 
Iterator[PartitionedFileReader[T]])
   override def next(): Boolean = {
     if (currentReader == null) {
       if (readers.hasNext) {
-        if (ignoreMissingFiles || ignoreCorruptFiles) {
-          try {
-            currentReader = getNextReader()
-          } catch {
-            case e: FileNotFoundException if ignoreMissingFiles =>
-              logWarning(s"Skipped missing file: $currentReader", e)
-              currentReader = null
-              return false
-            // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
-            case e: FileNotFoundException if !ignoreMissingFiles => throw e
-            case e @ (_: RuntimeException | _: IOException) if 
ignoreCorruptFiles =>
-              logWarning(
-                s"Skipped the rest of the content in the corrupted file: 
$currentReader", e)
-              currentReader = null
-              InputFileBlockHolder.unset()
-              return false
-          }
-        } else {
+        try {
           currentReader = getNextReader()
+        } catch {
+          case e: FileNotFoundException if ignoreMissingFiles =>
+            logWarning(s"Skipped missing file: $currentReader", e)
+            currentReader = null
+            return false
+          // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
+          case e: FileNotFoundException if !ignoreMissingFiles =>
+            throw new FileNotFoundException(
+              e.getMessage + "\n" +
+                "It is possible the underlying files have been updated. " +
+                "You can explicitly invalidate the cache in Spark by " +
+                "running 'REFRESH TABLE tableName' command in SQL or " +
+                "by recreating the Dataset/DataFrame involved.")
+          case e @ (_: RuntimeException | _: IOException) if 
ignoreCorruptFiles =>
+            logWarning(
+              s"Skipped the rest of the content in the corrupted file: 
$currentReader", e)
+            currentReader = null
+            InputFileBlockHolder.unset()
+            return false
         }
       } else {
         return false
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
index 98aa447..664d59c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -19,14 +19,14 @@ package org.apache.spark.sql
 
 import java.io.File
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 
 /**
  * Test suite to handle metadata cache related.
  */
-class MetadataCacheSuite extends QueryTest with SharedSQLContext {
+abstract class MetadataCacheSuite extends QueryTest with SharedSQLContext {
 
   /** Removes one data file in the given directory. */
   private def deleteOneFileInDirectory(dir: File): Unit = {
@@ -38,14 +38,15 @@ class MetadataCacheSuite extends QueryTest with 
SharedSQLContext {
     oneFile.foreach(_.delete())
   }
 
-  test("SPARK-16336 Suggest doing table refresh when encountering 
FileNotFoundException") {
+  test("SPARK-16336,SPARK-27504 Suggest doing table refresh " +
+    "when encountering FileNotFoundException") {
     withTempPath { (location: File) =>
       // Create a Parquet directory
       spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
-        .write.parquet(location.getAbsolutePath)
+        .write.orc(location.getAbsolutePath)
 
       // Read the directory in
-      val df = spark.read.parquet(location.getAbsolutePath)
+      val df = spark.read.orc(location.getAbsolutePath)
       assert(df.count() == 100)
 
       // Delete a file
@@ -60,14 +61,14 @@ class MetadataCacheSuite extends QueryTest with 
SharedSQLContext {
     }
   }
 
-  test("SPARK-16337 temporary view refresh") {
+  test("SPARK-16337,SPARK-27504 temporary view refresh") {
     withTempView("view_refresh") { withTempPath { (location: File) =>
       // Create a Parquet directory
       spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
-        .write.parquet(location.getAbsolutePath)
+        .write.orc(location.getAbsolutePath)
 
       // Read the directory in
-      
spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
+      
spark.read.orc(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
       assert(sql("select count(*) from view_refresh").first().getLong(0) == 
100)
 
       // Delete a file
@@ -93,10 +94,10 @@ class MetadataCacheSuite extends QueryTest with 
SharedSQLContext {
         withTempPath { (location: File) =>
           // Create a Parquet directory
           spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
-            .write.parquet(location.getAbsolutePath)
+            .write.orc(location.getAbsolutePath)
 
           // Read the directory in
-          
spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
+          
spark.read.orc(location.getAbsolutePath).createOrReplaceTempView("view_refresh")
 
           // Delete a file
           deleteOneFileInDirectory(location)
@@ -111,3 +112,17 @@ class MetadataCacheSuite extends QueryTest with 
SharedSQLContext {
     }
   }
 }
+
+class MetadataCacheV1Suite extends MetadataCacheSuite {
+  override protected def sparkConf: SparkConf =
+    super
+      .sparkConf
+      .set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc")
+}
+
+class MetadataCacheV2Suite extends MetadataCacheSuite {
+  override protected def sparkConf: SparkConf =
+    super
+      .sparkConf
+      .set(SQLConf.USE_V1_SOURCE_READER_LIST, "")
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to