This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fd009d652f7 [SPARK-45642][CORE][SQL] Fix `FileSystem.isFile & 
FileSystem.isDirectory is deprecated`
fd009d652f7 is described below

commit fd009d652f7922254ccc7cc631b8df3a6b821532
Author: panbingkun <pbk1...@gmail.com>
AuthorDate: Sun Dec 10 14:11:19 2023 -0800

    [SPARK-45642][CORE][SQL] Fix `FileSystem.isFile & FileSystem.isDirectory is 
deprecated`
    
    ### What changes were proposed in this pull request?
    The pr aims to fix `FileSystem.isFile & FileSystem.isDirectory is 
deprecated` & make some error message prompts more accurate.
    
    ### Why are the changes needed?
    - Prepare for future Hadoop to truly eliminate this method
    - Reduce warn prompts.
    - Make some error message prompts more accurate.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Pass GA.
    - Manually test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #43505 from panbingkun/SPARK-45642.
    
    Authored-by: panbingkun <pbk1...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../scala/org/apache/spark/deploy/SparkHadoopUtil.scala    |  9 ++++++++-
 .../apache/spark/deploy/history/FsHistoryProvider.scala    |  2 +-
 core/src/main/scala/org/apache/spark/util/Utils.scala      |  2 +-
 .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala   |  2 +-
 .../execution/datasources/PartitioningAwareFileIndex.scala | 14 +++++++++++---
 .../spark/sql/execution/streaming/FileStreamSink.scala     |  2 +-
 .../scala/org/apache/spark/streaming/util/HdfsUtils.scala  |  2 +-
 7 files changed, 24 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 50906f76b6e..628b688dedb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy
 
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream, File, IOException}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream, File, FileNotFoundException, IOException}
 import java.net.InetAddress
 import java.security.PrivilegedExceptionAction
 import java.text.DateFormat
@@ -593,4 +593,11 @@ private[spark] object SparkHadoopUtil extends Logging {
     }
   }
 
+  def isFile(fs: FileSystem, path: Path): Boolean = {
+    try {
+      fs.getFileStatus(path).isFile
+    } catch {
+      case _: FileNotFoundException => false
+    }
+  }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 565499bb610..73fb0086b33 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -860,7 +860,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
           try {
             // Fetch the entry first to avoid an RPC when it's already removed.
             listing.read(classOf[LogInfo], inProgressLog)
-            if (!fs.isFile(new Path(inProgressLog))) {
+            if (!SparkHadoopUtil.isFile(fs, new Path(inProgressLog))) {
               listing.synchronized {
                 listing.delete(classOf[LogInfo], inProgressLog)
               }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 049999281f5..a074bd53d26 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -674,7 +674,7 @@ private[spark] object Utils
       throw new IOException(s"Failed to create directory ${targetDir.getPath}")
     }
     val dest = new File(targetDir, filename.getOrElse(path.getName))
-    if (fs.isFile(path)) {
+    if (fs.getFileStatus(path).isFile) {
       val in = fs.open(path)
       try {
         downloadFile(path.toString, in, dest, fileOverwrite)
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index e235b8aeb77..d16a15df1b5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -1774,7 +1774,7 @@ class TestFileSystem extends 
org.apache.hadoop.fs.LocalFileSystem {
     status
   }
 
-  override def isFile(path: Path): Boolean = super.isFile(local(path))
+  override def getFileStatus(path: Path): FileStatus = 
super.getFileStatus(local(path))
 
   override def globStatus(pathPattern: Path): Array[FileStatus] = {
     val newPath = new Path(pathPattern.toUri.getPath)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index dc41afe226b..3efe614bcef 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.FileNotFoundException
+
 import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
@@ -222,9 +224,15 @@ abstract class PartitioningAwareFileIndex(
     caseInsensitiveMap.get(FileIndexOptions.BASE_PATH_PARAM).map(new Path(_)) 
match {
       case Some(userDefinedBasePath) =>
         val fs = userDefinedBasePath.getFileSystem(hadoopConf)
-        if (!fs.isDirectory(userDefinedBasePath)) {
-          throw new IllegalArgumentException(s"Option 
'${FileIndexOptions.BASE_PATH_PARAM}' " +
-            s"must be a directory")
+        try {
+          if (!fs.getFileStatus(userDefinedBasePath).isDirectory) {
+            throw new IllegalArgumentException(s"Option 
'${FileIndexOptions.BASE_PATH_PARAM}' " +
+              s"must be a directory")
+          }
+        } catch {
+          case _: FileNotFoundException =>
+            throw new IllegalArgumentException(s"Option 
'${FileIndexOptions.BASE_PATH_PARAM}' " +
+             s"not found")
         }
         val qualifiedBasePath = fs.makeQualified(userDefinedBasePath)
         val qualifiedBasePathStr = qualifiedBasePath.toString
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 04a1de02ea5..ea8db3c99de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -51,7 +51,7 @@ object FileStreamSink extends Logging {
         val hdfsPath = new Path(singlePath)
         try {
           val fs = hdfsPath.getFileSystem(hadoopConf)
-          if (fs.isDirectory(hdfsPath)) {
+          if (fs.getFileStatus(hdfsPath).isDirectory) {
             val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf)
             fs.exists(metadataPath)
           } else {
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index ef040681adf..703fcb5edb3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -30,7 +30,7 @@ private[streaming] object HdfsUtils {
     val dfs = getFileSystemForPath(dfsPath, conf)
     // If the file exists and we have append support, append instead of 
creating a new file
     val stream: FSDataOutputStream = {
-      if (dfs.isFile(dfsPath)) {
+      if (SparkHadoopUtil.isFile(dfs, dfsPath)) {
         if (conf.getBoolean("dfs.support.append", true) ||
             conf.getBoolean("hdfs.append.support", false) ||
             dfs.isInstanceOf[RawLocalFileSystem]) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to