This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d83f84a  [SPARK-27676][SQL][SS] InMemoryFileIndex should respect 
spark.sql.files.ignoreMissingFiles
d83f84a is described below

commit d83f84a1229138b7935b4b18da80a96d3c5c3dde
Author: Josh Rosen <rosenvi...@gmail.com>
AuthorDate: Wed Jun 26 09:11:28 2019 +0900

    [SPARK-27676][SQL][SS] InMemoryFileIndex should respect 
spark.sql.files.ignoreMissingFiles
    
    ## What changes were proposed in this pull request?
    
    Spark's `InMemoryFileIndex` contains two places where `FileNotFound` 
exceptions are caught and logged as warnings (during [directory 
listing](https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L274)
 and [block location 
lookup](https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources
 [...]
    
    I think that this is a dangerous default behavior because it can mask bugs 
caused by race conditions (e.g. overwriting a table while it's being read) or 
S3 consistency issues (there's more discussion on this in the [JIRA 
ticket](https://issues.apache.org/jira/browse/SPARK-27676)). Failing fast when 
we detect missing files is not sufficient to make concurrent table reads/writes 
or S3 listing safe (there are other classes of eventual consistency issues to 
worry about), but I think it's  [...]
    
    There may be some cases where users _do_ want to ignore missing files, but 
I think that should be an opt-in behavior via the existing 
`spark.sql.files.ignoreMissingFiles` flag (the current behavior is itself 
race-prone because a file might be be deleted between catalog listing and query 
execution time, triggering FileNotFoundExceptions on executors (which are 
handled in a way that _does_ respect `ignoreMissingFIles`)).
    
    This PR updates `InMemoryFileIndex` to guard the 
log-and-ignore-FileNotFoundException behind the existing 
`spark.sql.files.ignoreMissingFiles` flag.
    
    **Note**: this is a change of default behavior, so I think it needs to be 
mentioned in release notes.
    
    ## How was this patch tested?
    
    New unit tests to simulate file-deletion race conditions, tested with both 
values of the `ignoreMissingFIles` flag.
    
    Closes #24668 from JoshRosen/SPARK-27676.
    
    Lead-authored-by: Josh Rosen <rosenvi...@gmail.com>
    Co-authored-by: Josh Rosen <joshro...@stripe.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 docs/sql-migration-guide-upgrade.md                |   2 +
 .../spark/sql/execution/command/CommandUtils.scala |   2 +-
 .../execution/datasources/InMemoryFileIndex.scala  |  71 ++++++++++--
 .../sql/execution/datasources/FileIndexSuite.scala | 126 ++++++++++++++++++++-
 4 files changed, 188 insertions(+), 13 deletions(-)

diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index b062a04..c920f06 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -145,6 +145,8 @@ license: |
 
   - Since Spark 3.0, a higher-order function `exists` follows the three-valued 
boolean logic, i.e., if the `predicate` returns any `null`s and no `true` is 
obtained, then `exists` will return `null` instead of `false`. For example, 
`exists(array(1, null, 3), x -> x % 2 == 0)` will be `null`. The previous 
behaviour can be restored by setting 
`spark.sql.legacy.arrayExistsFollowsThreeValuedLogic` to `false`.
 
+  - Since Spark 3.0, if files or subdirectories disappear during recursive 
directory listing (i.e. they appear in an intermediate listing but then cannot 
be read or listed during later phases of the recursive directory listing, due 
to either concurrent file deletions or object store consistency issues) then 
the listing will fail with an exception unless 
`spark.sql.files.ignoreMissingFiles` is `true` (default `false`). In previous 
versions, these missing files or subdirectories would be i [...]
+
 ## Upgrading from Spark SQL 2.4 to 2.4.1
 
   - The value of `spark.executor.heartbeatInterval`, when specified without 
units like "30" rather than "30s", was
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index cac2519..b644e6d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -67,7 +67,7 @@ object CommandUtils extends Logging {
           override def accept(path: Path): Boolean = isDataPath(path, 
stagingDir)
         }
         val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
-          paths, sessionState.newHadoopConf(), pathFilter, spark)
+          paths, sessionState.newHadoopConf(), pathFilter, spark, areRootPaths 
= true)
         fileStatusSeq.flatMap(_._2.map(_.getLen)).sum
       } else {
         partitions.map { p =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 45cf924..cf7a130 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -125,7 +125,7 @@ class InMemoryFileIndex(
     }
     val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, 
this.getClass))
     val discovered = InMemoryFileIndex.bulkListLeafFiles(
-      pathsToFetch, hadoopConf, filter, sparkSession)
+      pathsToFetch, hadoopConf, filter, sparkSession, areRootPaths = true)
     discovered.foreach { case (path, leafFiles) =>
       HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
       fileStatusCache.putLeafFiles(path, leafFiles.toArray)
@@ -167,12 +167,22 @@ object InMemoryFileIndex extends Logging {
       paths: Seq[Path],
       hadoopConf: Configuration,
       filter: PathFilter,
-      sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
+      sparkSession: SparkSession,
+      areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = {
+
+    val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles
 
     // Short-circuits parallel listing when serial listing is likely to be 
faster.
     if (paths.size <= 
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
       return paths.map { path =>
-        (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
+        val leafFiles = listLeafFiles(
+          path,
+          hadoopConf,
+          filter,
+          Some(sparkSession),
+          ignoreMissingFiles = ignoreMissingFiles,
+          isRootPath = areRootPaths)
+        (path, leafFiles)
       }
     }
 
@@ -205,7 +215,14 @@ object InMemoryFileIndex extends Logging {
         .mapPartitions { pathStrings =>
           val hadoopConf = serializableConfiguration.value
           pathStrings.map(new Path(_)).toSeq.map { path =>
-            (path, listLeafFiles(path, hadoopConf, filter, None))
+            val leafFiles = listLeafFiles(
+              path,
+              hadoopConf,
+              filter,
+              None,
+              ignoreMissingFiles = ignoreMissingFiles,
+              isRootPath = areRootPaths)
+            (path, leafFiles)
           }.iterator
         }.map { case (path, statuses) =>
         val serializableStatuses = statuses.map { status =>
@@ -268,11 +285,12 @@ object InMemoryFileIndex extends Logging {
       path: Path,
       hadoopConf: Configuration,
       filter: PathFilter,
-      sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
+      sessionOpt: Option[SparkSession],
+      ignoreMissingFiles: Boolean,
+      isRootPath: Boolean): Seq[FileStatus] = {
     logTrace(s"Listing $path")
     val fs = path.getFileSystem(hadoopConf)
 
-    // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't 
exist
     // Note that statuses only include FileStatus for the files and dirs 
directly under path,
     // and does not include anything else recursively.
     val statuses: Array[FileStatus] = try {
@@ -290,7 +308,26 @@ object InMemoryFileIndex extends Logging {
         case _ => fs.listStatus(path)
       }
     } catch {
-      case _: FileNotFoundException =>
+      // If we are listing a root path (e.g. a top level directory of a 
table), we need to
+      // ignore FileNotFoundExceptions during this root level of the listing 
because
+      //
+      //  (a) certain code paths might construct an InMemoryFileIndex with 
root paths that
+      //      might not exist (i.e. not all callers are guaranteed to have 
checked
+      //      path existence prior to constructing InMemoryFileIndex) and,
+      //  (b) we need to ignore deleted root paths during REFRESH TABLE, 
otherwise we break
+      //      existing behavior and break the ability drop SessionCatalog 
tables when tables'
+      //      root directories have been deleted (which breaks a number of 
Spark's own tests).
+      //
+      // If we are NOT listing a root path then a FileNotFoundException here 
means that the
+      // directory was present in a previous level of file listing but is 
absent in this
+      // listing, likely indicating a race condition (e.g. concurrent table 
overwrite or S3
+      // list inconsistency).
+      //
+      // The trade-off in supporting existing behaviors / use-cases is that we 
won't be
+      // able to detect race conditions involving root paths being deleted 
during
+      // InMemoryFileIndex construction. However, it's still a net improvement 
to detect and
+      // fail-fast on the non-root cases. For more info see the SPARK-27676 
review discussion.
+      case _: FileNotFoundException if isRootPath || ignoreMissingFiles =>
         logWarning(s"The directory $path was not found. Was it deleted very 
recently?")
         Array.empty[FileStatus]
     }
@@ -301,9 +338,23 @@ object InMemoryFileIndex extends Logging {
       val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
       val nestedFiles: Seq[FileStatus] = sessionOpt match {
         case Some(session) =>
-          bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, 
session).flatMap(_._2)
+          bulkListLeafFiles(
+            dirs.map(_.getPath),
+            hadoopConf,
+            filter,
+            session,
+            areRootPaths = false
+          ).flatMap(_._2)
         case _ =>
-          dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, 
sessionOpt))
+          dirs.flatMap { dir =>
+            listLeafFiles(
+              dir.getPath,
+              hadoopConf,
+              filter,
+              sessionOpt,
+              ignoreMissingFiles = ignoreMissingFiles,
+              isRootPath = false)
+          }
       }
       val allFiles = topLevelFiles ++ nestedFiles
       if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else 
allFiles
@@ -345,7 +396,7 @@ object InMemoryFileIndex extends Logging {
           }
           Some(lfs)
         } catch {
-          case _: FileNotFoundException =>
+          case _: FileNotFoundException if ignoreMissingFiles =>
             missingFiles += f.getPath.toString
             None
         }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index bc83f3d..2a5c5a2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -17,13 +17,14 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.io.File
+import java.io.{File, FileNotFoundException}
 import java.net.URI
 
 import scala.collection.mutable
 
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, 
Path, RawLocalFileSystem}
 
+import org.apache.spark.SparkException
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.util._
@@ -167,7 +168,7 @@ class FileIndexSuite extends SharedSQLContext {
     }
   }
 
-  test("InMemoryFileIndex: folders that don't exist don't throw exceptions") {
+  test("InMemoryFileIndex: root folders that don't exist don't throw 
exceptions") {
     withTempDir { dir =>
       val deletedFolder = new File(dir, "deleted")
       assert(!deletedFolder.exists())
@@ -178,6 +179,67 @@ class FileIndexSuite extends SharedSQLContext {
     }
   }
 
+  test("SPARK-27676: InMemoryFileIndex respects ignoreMissingFiles config for 
non-root paths") {
+    import DeletionRaceFileSystem._
+    for (
+      raceCondition <- Seq(
+        classOf[SubdirectoryDeletionRaceFileSystem],
+        classOf[FileDeletionRaceFileSystem]
+      );
+      ignoreMissingFiles <- Seq(true, false);
+      parDiscoveryThreshold <- Seq(0, 100)
+    ) {
+      withClue(s"raceCondition=$raceCondition, 
ignoreMissingFiles=$ignoreMissingFiles, " +
+        s"parDiscoveryThreshold=$parDiscoveryThreshold"
+      ) {
+        withSQLConf(
+          SQLConf.IGNORE_MISSING_FILES.key -> ignoreMissingFiles.toString,
+          SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> 
parDiscoveryThreshold.toString,
+          "fs.mockFs.impl" -> raceCondition.getName,
+          "fs.mockFs.impl.disable.cache" -> "true"
+        ) {
+          def makeCatalog(): InMemoryFileIndex = new InMemoryFileIndex(
+            spark, Seq(rootDirPath), Map.empty, None)
+          if (ignoreMissingFiles) {
+            // We're ignoring missing files, so catalog construction should 
succeed
+            val catalog = makeCatalog()
+            val leafFiles = catalog.listLeafFiles(catalog.rootPaths)
+            if (raceCondition == classOf[SubdirectoryDeletionRaceFileSystem]) {
+              // The only subdirectory was missing, so there should be no leaf 
files:
+              assert(leafFiles.isEmpty)
+            } else {
+              assert(raceCondition == classOf[FileDeletionRaceFileSystem])
+              // One of the two leaf files was missing, but we should still 
list the other:
+              assert(leafFiles.size == 1)
+              assert(leafFiles.head.getPath == nonDeletedLeafFilePath)
+            }
+          } else {
+            // We're NOT ignoring missing files, so catalog construction 
should fail
+            val e = intercept[Exception] {
+              makeCatalog()
+            }
+            // The exact exception depends on whether we're using parallel 
listing
+            if (parDiscoveryThreshold == 0) {
+              // The FileNotFoundException occurs in a Spark executor (as part 
of a job)
+              assert(e.isInstanceOf[SparkException])
+              assert(e.getMessage.contains("FileNotFoundException"))
+            } else {
+              // The FileNotFoundException occurs directly on the driver
+              assert(e.isInstanceOf[FileNotFoundException])
+              // Test that the FileNotFoundException is triggered for the 
expected reason:
+              if (raceCondition == 
classOf[SubdirectoryDeletionRaceFileSystem]) {
+                assert(e.getMessage.contains(subDirPath.toString))
+              } else {
+                assert(raceCondition == classOf[FileDeletionRaceFileSystem])
+                assert(e.getMessage.contains(leafFilePath.toString))
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
   test("PartitioningAwareFileIndex listing parallelized with many top level 
dirs") {
     for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
       withTempDir { dir =>
@@ -356,6 +418,66 @@ class FileIndexSuite extends SharedSQLContext {
 
 }
 
+object DeletionRaceFileSystem {
+  val rootDirPath: Path = new Path("mockFs:///rootDir/")
+  val subDirPath: Path = new Path(rootDirPath, "subDir")
+  val leafFilePath: Path = new Path(subDirPath, "leafFile")
+  val nonDeletedLeafFilePath: Path = new Path(subDirPath, "nonDeletedLeafFile")
+  val rootListing: Array[FileStatus] =
+    Array(new FileStatus(0, true, 0, 0, 0, subDirPath))
+  val subFolderListing: Array[FileStatus] =
+    Array(
+      new FileStatus(0, false, 0, 100, 0, leafFilePath),
+      new FileStatus(0, false, 0, 100, 0, nonDeletedLeafFilePath))
+}
+
+// Used in SPARK-27676 test to simulate a race where a subdirectory is deleted
+// between back-to-back listing calls.
+class SubdirectoryDeletionRaceFileSystem extends RawLocalFileSystem {
+  import DeletionRaceFileSystem._
+
+  override def getScheme: String = "mockFs"
+
+  override def listStatus(path: Path): Array[FileStatus] = {
+    if (path == rootDirPath) {
+      rootListing
+    } else if (path == subDirPath) {
+      throw new FileNotFoundException(subDirPath.toString)
+    } else {
+      throw new IllegalArgumentException()
+    }
+  }
+}
+
+// Used in SPARK-27676 test to simulate a race where a file is deleted between
+// being listed and having its size / file status checked.
+class FileDeletionRaceFileSystem extends RawLocalFileSystem {
+  import DeletionRaceFileSystem._
+
+  override def getScheme: String = "mockFs"
+
+  override def listStatus(path: Path): Array[FileStatus] = {
+    if (path == rootDirPath) {
+      rootListing
+    } else if (path == subDirPath) {
+      subFolderListing
+    } else {
+      throw new IllegalArgumentException()
+    }
+  }
+
+  override def getFileBlockLocations(
+      file: FileStatus,
+      start: Long,
+      len: Long): Array[BlockLocation] = {
+    if (file.getPath == leafFilePath) {
+      throw new FileNotFoundException(leafFilePath.toString)
+    } else {
+      Array.empty
+    }
+  }
+}
+
 class FakeParentPathFileSystem extends RawLocalFileSystem {
   override def getScheme: String = "mockFs"
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to