This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new faedcd91d55 [SPARK-41970] Introduce SparkPath for typesafety
faedcd91d55 is described below

commit faedcd91d554a00fc76116a0c188752cf036f907
Author: David Lewis <david.le...@databricks.com>
AuthorDate: Thu Jan 19 10:05:51 2023 +0800

    [SPARK-41970] Introduce SparkPath for typesafety
    
    ### What changes were proposed in this pull request?
    This PR proposes a strongly typed `SparkPath` that encapsulates a 
url-encoded string. It has helper methods for creating hadoop paths, uris, and 
uri-encoded strings.
    The intent is to identify and fix various bugs in the way that Spark 
handles these paths. To do this we introduced the SparkPath type to 
`PartitionFile` (a widely used class), and then started fixing compile errors. 
In doing so we fixed various bugs.
    
    ### Why are the changes needed?
    
    Given `val str = "s3://bucket/path with space/a"` There is a difference 
between `new Path(str)` and `new Path(new URI(str))`, and thus a difference 
between `new URI(str)` and `new Path(str).toUri`.
    Both `URI` and `Path` are symmetric in construction and `toString`, but are 
not interchangeable. Spark confuses these two paths (uri-encoded vs not). This 
PR attempts to use types to disambiguate them.
    
    ### Does this PR introduce _any_ user-facing change?
    
    This PR proposes changing the public API of `PartitionedFile`, and various 
other methods in the name of type safety. It needs to be clear to callers of an 
API what type of path string is expected.
    
    ### How was this patch tested?
    
    We rely on existing tests, and update the default temp path creation to 
include paths with spaces.
    
    Closes #39488 from databricks-david-lewis/SPARK_PATH.
    
    Authored-by: David Lewis <david.le...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/avro/AvroFileFormat.scala |  5 +-
 .../sql/v2/avro/AvroPartitionReaderFactory.scala   |  7 +--
 .../apache/spark/sql/avro/AvroRowReaderSuite.scala |  6 +--
 .../org/apache/spark/sql/avro/AvroSuite.scala      |  3 +-
 .../apache/spark/deploy/worker/WorkerWatcher.scala |  2 +-
 .../scala/org/apache/spark/paths/SparkPath.scala   | 55 ++++++++++++++++++++++
 .../scala/org/apache/spark/rpc/RpcAddress.scala    |  2 +-
 .../spark/ml/source/image/ImageFileFormat.scala    |  6 +--
 scalastyle-config.xml                              |  8 ++++
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  7 +--
 .../spark/sql/execution/DataSourceScanExec.scala   |  4 +-
 .../apache/spark/sql/execution/FileRelation.scala  |  4 +-
 .../spark/sql/execution/PartitionedFileUtil.scala  |  5 +-
 .../execution/datasources/CatalogFileIndex.scala   |  3 +-
 .../sql/execution/datasources/DataSource.scala     |  3 +-
 .../sql/execution/datasources/FileIndex.scala      |  3 +-
 .../sql/execution/datasources/FileScanRDD.scala    | 21 ++++++---
 .../datasources/HadoopFileLinesReader.scala        |  4 +-
 .../datasources/HadoopFileWholeTextReader.scala    |  4 +-
 .../execution/datasources/HadoopFsRelation.scala   |  3 +-
 .../datasources/PartitioningAwareFileIndex.scala   |  5 +-
 .../datasources/binaryfile/BinaryFileFormat.scala  |  3 +-
 .../execution/datasources/csv/CSVDataSource.scala  |  3 +-
 .../execution/datasources/csv/CSVFileFormat.scala  |  2 +-
 .../datasources/json/JsonDataSource.scala          |  5 +-
 .../execution/datasources/orc/OrcFileFormat.scala  |  3 +-
 .../datasources/parquet/ParquetFileFormat.scala    |  4 +-
 .../datasources/v2/FilePartitionReader.scala       |  8 ++--
 .../sql/execution/datasources/v2/FileScan.scala    |  2 +-
 .../v2/csv/CSVPartitionReaderFactory.scala         |  2 +-
 .../v2/orc/OrcPartitionReaderFactory.scala         | 10 ++--
 .../v2/parquet/ParquetPartitionReaderFactory.scala | 11 ++---
 .../execution/streaming/FileStreamSinkLog.scala    | 17 ++++---
 .../sql/execution/streaming/FileStreamSource.scala | 35 ++++++++------
 .../spark/sql/FileBasedDataSourceSuite.scala       |  3 +-
 .../scala/org/apache/spark/sql/SubquerySuite.scala |  2 +-
 .../datasources/FileSourceStrategySuite.scala      |  9 ++--
 .../datasources/HadoopFileLinesReaderSuite.scala   |  7 ++-
 .../binaryfile/BinaryFileFormatSuite.scala         |  4 +-
 .../spark/sql/streaming/FileStreamSinkSuite.scala  |  9 ++--
 .../sql/streaming/FileStreamSourceSuite.scala      | 45 +++++++++---------
 .../apache/spark/sql/hive/orc/OrcFileFormat.scala  |  5 +-
 42 files changed, 216 insertions(+), 133 deletions(-)

diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index a13e0624f35..3e16e121081 100755
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.avro
 
 import java.io._
-import java.net.URI
 
 import scala.util.control.NonFatal
 
@@ -96,9 +95,9 @@ private[sql] class AvroFileFormat extends FileFormat
       // Doing input file filtering is improper because we may generate empty 
tasks that process no
       // input files but stress the scheduler. We should probably add a more 
general input file
       // filtering mechanism for `FileFormat` data sources. See SPARK-16317.
-      if (parsedOptions.ignoreExtension || file.filePath.endsWith(".avro")) {
+      if (parsedOptions.ignoreExtension || 
file.urlEncodedPath.endsWith(".avro")) {
         val reader = {
-          val in = new FsInput(new Path(new URI(file.filePath)), conf)
+          val in = new FsInput(file.toPath, conf)
           try {
             val datumReader = userProvidedSchema match {
               case Some(userSchema) => new 
GenericDatumReader[GenericRecord](userSchema)
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
index 3ad63f113fe..cc7bd180e84 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
@@ -16,14 +16,11 @@
  */
 package org.apache.spark.sql.v2.avro
 
-import java.net.URI
-
 import scala.util.control.NonFatal
 
 import org.apache.avro.file.DataFileReader
 import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
 import org.apache.avro.mapred.FsInput
-import org.apache.hadoop.fs.Path
 
 import org.apache.spark.TaskContext
 import org.apache.spark.broadcast.Broadcast
@@ -62,9 +59,9 @@ case class AvroPartitionReaderFactory(
     val conf = broadcastedConf.value.value
     val userProvidedSchema = options.schema
 
-    if (options.ignoreExtension || partitionedFile.filePath.endsWith(".avro")) 
{
+    if (options.ignoreExtension || 
partitionedFile.urlEncodedPath.endsWith(".avro")) {
       val reader = {
-        val in = new FsInput(new Path(new URI(partitionedFile.filePath)), conf)
+        val in = new FsInput(partitionedFile.toPath, conf)
         try {
           val datumReader = userProvidedSchema match {
             case Some(userSchema) => new 
GenericDatumReader[GenericRecord](userSchema)
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
index 53064371b2a..15b1e5ecf88 100644
--- 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
+++ 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala
@@ -18,13 +18,11 @@
 package org.apache.spark.sql.avro
 
 import java.io._
-import java.net.URI
 
 import org.apache.avro.file.DataFileReader
 import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
 import org.apache.avro.mapred.FsInput
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql._
@@ -62,8 +60,8 @@ class AvroRowReaderSuite
         case BatchScanExec(_, f: AvroScan, _, _, _, _, _) => f
       }
       val filePath = fileScan.get.fileIndex.inputFiles(0)
-      val fileSize = new File(new URI(filePath)).length
-      val in = new FsInput(new Path(new URI(filePath)), new Configuration())
+      val fileSize = new File(filePath.toUri).length
+      val in = new FsInput(filePath.toPath, new Configuration())
       val reader = DataFileReader.openReader(in, new 
GenericDatumReader[GenericRecord]())
 
       val it = new Iterator[InternalRow] with AvroUtils.RowReader {
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index f74274b0a3c..d4e85addf95 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -2357,7 +2357,8 @@ class AvroV2Suite extends AvroSuite with 
ExplainSuiteHelper {
       assert(fileScan.get.dataFilters.nonEmpty)
       assert(fileScan.get.planInputPartitions().forall { partition =>
         partition.asInstanceOf[FilePartition].files.forall { file =>
-          file.filePath.contains("p1=1") && file.filePath.contains("p2=2")
+          file.urlEncodedPath.contains("p1=1") &&
+            file.urlEncodedPath.contains("p2=2")
         }
       })
       checkAnswer(df, Row("b", 1, 2))
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index b7a5728dd00..deb5bb1a697 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -47,7 +47,7 @@ private[spark] class WorkerWatcher(
   private[deploy] var isShutDown = false
 
   // Lets filter events only from the worker's rpc system
-  private val expectedAddress = RpcAddress.fromURIString(workerUrl)
+  private val expectedAddress = RpcAddress.fromUrlString(workerUrl)
   private def isWorker(address: RpcAddress) = expectedAddress == address
 
   private def exitNonZero() =
diff --git a/core/src/main/scala/org/apache/spark/paths/SparkPath.scala 
b/core/src/main/scala/org/apache/spark/paths/SparkPath.scala
new file mode 100644
index 00000000000..5bc6233f6cf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/paths/SparkPath.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.paths
+
+import java.net.URI
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+/**
+ * A canonical representation of a file path. This class is intended to provide
+ * type-safety to the way that Spark handles Paths. Paths can be represented as
+ * Strings in multiple ways, which are not always compatible. Spark regularly 
uses
+ * two ways: 1. hadoop Path.toString and java URI.toString.
+ */
+case class SparkPath private (private val underlying: String) {
+  def urlEncoded: String = underlying
+  def toUri: URI = new URI(underlying)
+  def toPath: Path = new Path(toUri)
+  override def toString: String = underlying
+}
+
+object SparkPath {
+  /**
+   * Creates a SparkPath from a hadoop Path string.
+   * Please be very sure that the provided string is encoded (or not encoded) 
in the right way.
+   *
+   * Please see the hadoop Path documentation here:
+   * 
https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/Path.html#Path-java.lang.String-
+   */
+  def fromPathString(str: String): SparkPath = fromPath(new Path(str))
+  def fromPath(path: Path): SparkPath = fromUri(path.toUri)
+  def fromFileStatus(fs: FileStatus): SparkPath = fromPath(fs.getPath)
+
+  /**
+   * Creates a SparkPath from a url-encoded string.
+   * Note: It is the responsibility of the caller to ensure that str is a 
valid url-encoded string.
+   */
+  def fromUrlString(str: String): SparkPath = SparkPath(str)
+  def fromUri(uri: URI): SparkPath = fromUrlString(uri.toString)
+}
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala 
b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
index 9b0739c9447..675dc24206a 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala
@@ -39,7 +39,7 @@ private[spark] case class RpcAddress(_host: String, port: 
Int) {
 private[spark] object RpcAddress {
 
   /** Return the [[RpcAddress]] represented by `uri`. */
-  def fromURIString(uri: String): RpcAddress = {
+  def fromUrlString(uri: String): RpcAddress = {
     val uriObj = new java.net.URI(uri)
     RpcAddress(uriObj.getHost, uriObj.getPort)
   }
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala 
b/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
index 0995df51c64..206ce6f0675 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ml.source.image
 
 import com.google.common.io.{ByteStreams, Closeables}
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.FileStatus
 import org.apache.hadoop.mapreduce.Job
 
 import org.apache.spark.ml.image.ImageSchema
@@ -71,8 +71,8 @@ private[image] class ImageFileFormat extends FileFormat with 
DataSourceRegister
       if (!imageSourceOptions.dropInvalid && requiredSchema.isEmpty) {
         Iterator(emptyUnsafeRow)
       } else {
-        val origin = file.filePath
-        val path = new Path(origin)
+        val origin = file.urlEncodedPath
+        val path = file.toPath
         val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
         val stream = fs.open(path)
         val bytes = try {
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index f34b5d55e42..3dcb03b13fd 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -437,4 +437,12 @@ This file is divided into 3 sections:
       Use org.apache.spark.util.Utils.createTempDir instead.
     </customMessage>
   </check>
+
+  <check customId="pathfromuri" level="error" 
class="org.scalastyle.file.RegexChecker" enabled="true">
+    <parameters><parameter name="regex">new Path\(new 
URI\(</parameter></parameters>
+    <customMessage><![CDATA[
+      Are you sure that this string is uri encoded? Please be careful when 
converting hadoop Paths
+      and URIs to and from String. If possible, please use SparkPath.
+    ]]></customMessage>
+  </check>
 </scalastyle>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c8e2a48859d..88c4fe511a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -34,6 +34,7 @@ import org.apache.spark.api.java.function._
 import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
 import org.apache.spark.api.r.RRDD
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.paths.SparkPath
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, 
QueryPlanningTracker, ScalaReflection, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
@@ -3924,18 +3925,18 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def inputFiles: Array[String] = {
-    val files: Seq[String] = queryExecution.optimizedPlan.collect {
+    val files: Seq[SparkPath] = queryExecution.optimizedPlan.collect {
       case LogicalRelation(fsBasedRelation: FileRelation, _, _, _) =>
         fsBasedRelation.inputFiles
       case fr: FileRelation =>
         fr.inputFiles
       case r: HiveTableRelation =>
-        r.tableMeta.storage.locationUri.map(_.toString).toArray
+        r.tableMeta.storage.locationUri.map(SparkPath.fromUri).toArray
       case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _, 
_, _, _),
           _, _, _, _) =>
         table.fileIndex.inputFiles
     }.flatten
-    files.toSet.toArray
+    files.iterator.map(_.urlEncoded).toSet.toArray
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 8dda88e86c0..0f4b8c563d2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -632,8 +632,8 @@ case class FileSourceScanExec(
         }
       }.groupBy { f =>
         BucketingUtils
-          .getBucketId(new Path(f.filePath).getName)
-          .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath))
+          .getBucketId(f.toPath.getName)
+          .getOrElse(throw 
QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath))
       }
 
     val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
index a299fed7fd1..6a832b784fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.paths.SparkPath
+
 /**
  * An interface for relations that are backed by files.  When a class 
implements this interface,
  * the list of paths that it returns will be returned to a user who calls 
`inputPaths` on any
@@ -24,5 +26,5 @@ package org.apache.spark.sql.execution
  */
 trait FileRelation {
   /** Returns the list of files that will be read when scanning this relation. 
*/
-  def inputFiles: Array[String]
+  def inputFiles: Array[SparkPath]
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
index 4cccd4132e9..fd5f2f25c0b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, 
Path}
 
+import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources._
@@ -36,7 +37,7 @@ object PartitionedFileUtil {
         val remaining = file.getLen - offset
         val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
         val hosts = getBlockHosts(getBlockLocations(file), offset, size)
-        PartitionedFile(partitionValues, filePath.toUri.toString, offset, 
size, hosts,
+        PartitionedFile(partitionValues, SparkPath.fromPath(filePath), offset, 
size, hosts,
           file.getModificationTime, file.getLen)
       }
     } else {
@@ -49,7 +50,7 @@ object PartitionedFileUtil {
       filePath: Path,
       partitionValues: InternalRow): PartitionedFile = {
     val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen)
-    PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, 
hosts,
+    PartitionedFile(partitionValues, SparkPath.fromPath(filePath), 0, 
file.getLen, hosts,
       file.getModificationTime, file.getLen)
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 727b33018fb..f12b72f6867 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -22,6 +22,7 @@ import java.net.URI
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.expressions._
@@ -94,7 +95,7 @@ class CatalogFileIndex(
     }
   }
 
-  override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
+  override def inputFiles: Array[SparkPath] = filterPartitions(Nil).inputFiles
 
   // `CatalogFileIndex` may be a member of `HadoopFsRelation`, 
`HadoopFsRelation` may be a member
   // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. 
So we need to
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index ad26ee21c2c..94dd3bc0bd6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -70,7 +70,8 @@ import org.apache.spark.util.{HadoopFSUtils, ThreadUtils, 
Utils}
  *
  * @param paths A list of file system paths that hold data. These will be 
globbed before if
  *              the "__globPaths__" option is true, and will be qualified. 
This option only works
- *              when reading from a [[FileFormat]].
+ *              when reading from a [[FileFormat]]. These paths are expected 
to be hadoop [[Path]]
+ *              strings.
  * @param userSpecifiedSchema An optional specification of the schema of the 
data. When present
  *                            we skip attempting to infer the schema.
  * @param partitionColumns A list of column names that the relation is 
partitioned by. This list is
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
index 7bfc781797e..d9a63edca73 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.hadoop.fs._
 
+import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types.StructType
@@ -62,7 +63,7 @@ trait FileIndex {
    * Returns the list of files that will be read when scanning this relation. 
This call may be
    * very expensive for large tables.
    */
-  def inputFiles: Array[String]
+  def inputFiles: Array[SparkPath]
 
   /** Refresh any cached file listings */
   def refresh(): Unit
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 827d41dd096..0ccf72823f1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources
 
 import java.io.{Closeable, FileNotFoundException, IOException}
+import java.net.URI
 
 import scala.util.control.NonFatal
 
@@ -25,6 +26,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, 
TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.paths.SparkPath
 import org.apache.spark.rdd.{InputFileBlockHolder, RDD}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow}
@@ -51,12 +53,17 @@ import org.apache.spark.util.NextIterator
  */
 case class PartitionedFile(
     partitionValues: InternalRow,
-    filePath: String,
+    filePath: SparkPath,
     start: Long,
     length: Long,
     @transient locations: Array[String] = Array.empty,
     modificationTime: Long = 0L,
     fileSize: Long = 0L) {
+
+  def pathUri: URI = filePath.toUri
+  def toPath: Path = filePath.toPath
+  def urlEncodedPath: String = filePath.urlEncoded
+
   override def toString: String = {
     s"path: $filePath, range: $start-${start + length}, partition values: 
$partitionValues"
   }
@@ -140,14 +147,14 @@ class FileScanRDD(
       private def updateMetadataRow(): Unit =
         if (metadataColumns.nonEmpty && currentFile != null) {
           updateMetadataInternalRow(metadataRow, metadataColumns.map(_.name),
-            new Path(currentFile.filePath), currentFile.fileSize, 
currentFile.modificationTime)
+            currentFile.toPath, currentFile.fileSize, 
currentFile.modificationTime)
         }
 
       /**
        * Create an array of constant column vectors containing all required 
metadata columns
        */
       private def createMetadataColumnVector(c: ColumnarBatch): 
Array[ColumnVector] = {
-        val path = new Path(currentFile.filePath)
+        val path = currentFile.toPath
         metadataColumns.map(_.name).map {
           case FILE_PATH =>
             val columnVector = new ConstantColumnVector(c.numRows(), 
StringType)
@@ -223,7 +230,8 @@ class FileScanRDD(
           updateMetadataRow()
           logInfo(s"Reading File $currentFile")
           // Sets InputFileBlockHolder for the file block's information
-          InputFileBlockHolder.set(currentFile.filePath, currentFile.start, 
currentFile.length)
+          InputFileBlockHolder
+            .set(currentFile.urlEncodedPath, currentFile.start, 
currentFile.length)
 
           resetCurrentIterator()
           if (ignoreMissingFiles || ignoreCorruptFiles) {
@@ -278,12 +286,13 @@ class FileScanRDD(
           } catch {
             case e: SchemaColumnConvertNotSupportedException =>
               throw QueryExecutionErrors.unsupportedSchemaColumnConvertError(
-                currentFile.filePath, e.getColumn, e.getLogicalType, 
e.getPhysicalType, e)
+                currentFile.urlEncodedPath, e.getColumn, e.getLogicalType, 
e.getPhysicalType, e)
             case sue: SparkUpgradeException => throw sue
             case NonFatal(e) =>
               e.getCause match {
                 case sue: SparkUpgradeException => throw sue
-                case _ => throw QueryExecutionErrors.cannotReadFilesError(e, 
currentFile.filePath)
+                case _ =>
+                  throw QueryExecutionErrors.cannotReadFilesError(e, 
currentFile.urlEncodedPath)
               }
           }
         } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
index b5e276bd421..5ec17290c37 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala
@@ -18,10 +18,8 @@
 package org.apache.spark.sql.execution.datasources
 
 import java.io.Closeable
-import java.net.URI
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{FileSplit, LineRecordReader}
@@ -48,7 +46,7 @@ class HadoopFileLinesReader(
 
   private val _iterator = {
     val fileSplit = new FileSplit(
-      new Path(new URI(file.filePath)),
+      file.toPath,
       file.start,
       file.length,
       // The locality is decided by `getPreferredLocations` in `FileScanRDD`.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
index a48001f04a9..17649f62d84 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala
@@ -18,10 +18,8 @@
 package org.apache.spark.sql.execution.datasources
 
 import java.io.Closeable
-import java.net.URI
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
@@ -37,7 +35,7 @@ class HadoopFileWholeTextReader(file: PartitionedFile, conf: 
Configuration)
   extends Iterator[Text] with Closeable {
   private val _iterator = {
     val fileSplit = new CombineFileSplit(
-      Array(new Path(new URI(file.filePath))),
+      Array(file.toPath),
       Array(file.start),
       Array(file.length),
       // The locality is decided by `getPreferredLocations` in `FileScanRDD`.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index fd1824055dc..bd04ddb2ec6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.execution.FileRelation
@@ -70,5 +71,5 @@ case class HadoopFsRelation(
   }
 
 
-  override def inputFiles: Array[String] = location.inputFiles
+  override def inputFiles: Array[SparkPath] = location.inputFiles
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 137fd6fe1ac..2d8c7b19507 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{expressions, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
@@ -136,8 +137,8 @@ abstract class PartitioningAwareFileIndex(
   }
 
   /** Returns the list of files that will be read when scanning this relation. 
*/
-  override def inputFiles: Array[String] =
-    allFiles().map(_.getPath.toUri.toString).toArray
+  override def inputFiles: Array[SparkPath] =
+    allFiles().map(SparkPath.fromFileStatus).toArray
 
   override def sizeInBytes: Long = allFiles().map(_.getLen).sum
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
index 43512ff5ac8..ba6d351761e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.binaryfile
 
-import java.net.URI
 import java.sql.Timestamp
 
 import com.google.common.io.{ByteStreams, Closeables}
@@ -101,7 +100,7 @@ class BinaryFileFormat extends FileFormat with 
DataSourceRegister {
     val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)
 
     file: PartitionedFile => {
-      val path = new Path(new URI(file.filePath))
+      val path = file.toPath
       val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
       val status = fs.getFileStatus(path)
       if (filterFuncs.forall(_.apply(status))) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index d8fa768a604..99d43953c4c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.csv
 
-import java.net.URI
 import java.nio.charset.{Charset, StandardCharsets}
 
 import com.univocity.parsers.csv.CsvParser
@@ -179,7 +178,7 @@ object MultiLineCSVDataSource extends CSVDataSource {
       headerChecker: CSVHeaderChecker,
       requiredSchema: StructType): Iterator[InternalRow] = {
     UnivocityParser.parseStream(
-      CodecStreams.createInputStreamWithCloseResource(conf, new Path(new 
URI(file.filePath))),
+      CodecStreams.createInputStreamWithCloseResource(conf, file.toPath),
       parser,
       headerChecker,
       requiredSchema)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 93679516a8c..2a6c209ff0c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -128,7 +128,7 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
       val schema = if (columnPruning) actualRequiredSchema else 
actualDataSchema
       val isStartOfFile = file.start == 0
       val headerChecker = new CSVHeaderChecker(
-        schema, parsedOptions, source = s"CSV file: ${file.filePath}", 
isStartOfFile)
+        schema, parsedOptions, source = s"CSV file: ${file.urlEncodedPath}", 
isStartOfFile)
       CSVDataSource(parsedOptions).readFile(
         conf,
         file,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 2f4cd468457..7c98c31bba2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.datasources.json
 
 import java.io.InputStream
-import java.net.URI
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
 import com.google.common.io.ByteStreams
@@ -211,7 +210,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
       schema: StructType): Iterator[InternalRow] = {
     def partitionedFileString(ignored: Any): UTF8String = {
       Utils.tryWithResource {
-        CodecStreams.createInputStreamWithCloseResource(conf, new Path(new 
URI(file.filePath)))
+        CodecStreams.createInputStreamWithCloseResource(conf, file.toPath)
       } { inputStream =>
         UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
       }
@@ -227,6 +226,6 @@ object MultiLineJsonDataSource extends JsonDataSource {
       parser.options.columnNameOfCorruptRecord)
 
     safeParser.parse(
-      CodecStreams.createInputStreamWithCloseResource(conf, new Path(new 
URI(file.filePath))))
+      CodecStreams.createInputStreamWithCloseResource(conf, file.toPath))
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 6a58513c346..cb18566e848 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.datasources.orc
 
 import java.io._
-import java.net.URI
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
@@ -164,7 +163,7 @@ class OrcFileFormat
     (file: PartitionedFile) => {
       val conf = broadcastedConf.value.value
 
-      val filePath = new Path(new URI(file.filePath))
+      val filePath = file.toPath
 
       val fs = filePath.getFileSystem(conf)
       val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 80b6791d8fa..6b4651e3260 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.net.URI
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.{Failure, Try}
@@ -200,7 +198,7 @@ class ParquetFileFormat
     (file: PartitionedFile) => {
       assert(file.partitionValues.numFields == partitionSchema.size)
 
-      val filePath = new Path(new URI(file.filePath))
+      val filePath = file.toPath
       val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
 
       val sharedConf = broadcastedHadoopConf.value.value
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
index 782c1f50d80..7159bc6de3a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -66,7 +66,8 @@ class FilePartitionReader[T](
     } catch {
       case e: SchemaColumnConvertNotSupportedException =>
         throw QueryExecutionErrors.unsupportedSchemaColumnConvertError(
-          currentReader.file.filePath, e.getColumn, e.getLogicalType, 
e.getPhysicalType, e)
+          currentReader.file.urlEncodedPath,
+          e.getColumn, e.getLogicalType, e.getPhysicalType, e)
       case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
         logWarning(
           s"Skipped the rest of the content in the corrupted file: 
$currentReader", e)
@@ -75,7 +76,8 @@ class FilePartitionReader[T](
       case NonFatal(e) =>
         e.getCause match {
           case sue: SparkUpgradeException => throw sue
-          case _ => throw QueryExecutionErrors.cannotReadFilesError(e, 
currentReader.file.filePath)
+          case _ => throw QueryExecutionErrors.cannotReadFilesError(e,
+            currentReader.file.urlEncodedPath)
         }
     }
     if (hasNext) {
@@ -101,7 +103,7 @@ class FilePartitionReader[T](
     logInfo(s"Reading file $reader")
     // Sets InputFileBlockHolder for the file block's information
     val file = reader.file
-    InputFileBlockHolder.set(file.filePath, file.start, file.length)
+    InputFileBlockHolder.set(file.urlEncodedPath, file.start, file.length)
     reader
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
index 9b6f9932866..0cfb55ab407 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
@@ -163,7 +163,7 @@ trait FileScan extends Scan
     }
 
     if (splitFiles.length == 1) {
-      val path = new Path(splitFiles(0).filePath)
+      val path = splitFiles(0).toPath
       if (!isSplitable(path) && splitFiles(0).length >
         sparkSession.sparkContext.getConf.get(IO_WARNING_LARGEFILETHRESHOLD)) {
         logWarning(s"Loading one large unsplittable file ${path.toString} with 
only one " +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
index f8a17c8eaa8..37f6ae4aaa9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala
@@ -61,7 +61,7 @@ case class CSVPartitionReaderFactory(
     val schema = if (options.columnPruning) actualReadDataSchema else 
actualDataSchema
     val isStartOfFile = file.start == 0
     val headerChecker = new CSVHeaderChecker(
-      schema, options, source = s"CSV file: ${file.filePath}", isStartOfFile)
+      schema, options, source = s"CSV file: ${file.urlEncodedPath}", 
isStartOfFile)
     val iter = CSVDataSource(options).readFile(
       conf,
       file,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
index 4f93a67cc46..2b7bdae6b31 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.spark.sql.execution.datasources.v2.orc
 
-import java.net.URI
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
@@ -86,7 +84,7 @@ case class OrcPartitionReaderFactory(
     if (aggregation.nonEmpty) {
       return buildReaderWithAggregates(file, conf)
     }
-    val filePath = new Path(new URI(file.filePath))
+    val filePath = file.toPath
 
     val orcSchema = Utils.tryWithResource(createORCReader(filePath, 
conf))(_.getSchema)
     val resultedColPruneInfo = OrcUtils.requestedColumnIds(
@@ -127,7 +125,7 @@ case class OrcPartitionReaderFactory(
     if (aggregation.nonEmpty) {
       return buildColumnarReaderWithAggregates(file, conf)
     }
-    val filePath = new Path(new URI(file.filePath))
+    val filePath = file.toPath
 
     val orcSchema = Utils.tryWithResource(createORCReader(filePath, 
conf))(_.getSchema)
     val resultedColPruneInfo = OrcUtils.requestedColumnIds(
@@ -181,7 +179,7 @@ case class OrcPartitionReaderFactory(
   private def buildReaderWithAggregates(
       file: PartitionedFile,
       conf: Configuration): PartitionReader[InternalRow] = {
-    val filePath = new Path(new URI(file.filePath))
+    val filePath = file.toPath
     new PartitionReader[InternalRow] {
       private var hasNext = true
       private lazy val row: InternalRow = {
@@ -209,7 +207,7 @@ case class OrcPartitionReaderFactory(
   private def buildColumnarReaderWithAggregates(
       file: PartitionedFile,
       conf: Configuration): PartitionReader[ColumnarBatch] = {
-    val filePath = new Path(new URI(file.filePath))
+    val filePath = file.toPath
     new PartitionReader[ColumnarBatch] {
       private var hasNext = true
       private lazy val batch: ColumnarBatch = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 121ebe1cfa2..5951c1d8dd9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -16,10 +16,8 @@
  */
 package org.apache.spark.sql.execution.datasources.v2.parquet
 
-import java.net.URI
 import java.time.ZoneId
 
-import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.FileSplit
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
@@ -89,7 +87,7 @@ case class ParquetPartitionReaderFactory(
 
   private def getFooter(file: PartitionedFile): ParquetMetadata = {
     val conf = broadcastedConf.value.value
-    val filePath = new Path(new URI(file.filePath))
+    val filePath = file.toPath
 
     if (aggregation.isEmpty) {
       ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS)
@@ -132,7 +130,8 @@ case class ParquetPartitionReaderFactory(
           val footer = getFooter(file)
 
           if (footer != null && footer.getBlocks.size > 0) {
-            ParquetUtils.createAggInternalRowFromFooter(footer, file.filePath, 
dataSchema,
+            ParquetUtils.createAggInternalRowFromFooter(footer, 
file.urlEncodedPath,
+              dataSchema,
               partitionSchema, aggregation.get, readDataSchema, 
file.partitionValues,
               getDatetimeRebaseSpec(footer.getFileMetaData))
           } else {
@@ -175,7 +174,7 @@ case class ParquetPartitionReaderFactory(
         private val batch: ColumnarBatch = {
           val footer = getFooter(file)
           if (footer != null && footer.getBlocks.size > 0) {
-            val row = ParquetUtils.createAggInternalRowFromFooter(footer, 
file.filePath,
+            val row = ParquetUtils.createAggInternalRowFromFooter(footer, 
file.urlEncodedPath,
               dataSchema, partitionSchema, aggregation.get, readDataSchema, 
file.partitionValues,
               getDatetimeRebaseSpec(footer.getFileMetaData))
             AggregatePushDownUtils.convertAggregatesRowToBatch(
@@ -209,7 +208,7 @@ case class ParquetPartitionReaderFactory(
           RebaseSpec) => RecordReader[Void, T]): RecordReader[Void, T] = {
     val conf = broadcastedConf.value.value
 
-    val filePath = new Path(new URI(file.filePath))
+    val filePath = file.toPath
     val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
 
     lazy val footerFileMetaData = getFooter(file).getFileMetaData
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 2d70d95c685..94ba8b8aa51 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.net.URI
-
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.FileStatus
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
+import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.internal.SQLConf
 
@@ -30,7 +29,7 @@ import org.apache.spark.sql.internal.SQLConf
  * The status of a file outputted by [[FileStreamSink]]. A file is visible 
only if it appears in
  * the sink log and its action is not "delete".
  *
- * @param path the file path.
+ * @param path the file path as a uri-encoded string.
  * @param size the file size.
  * @param isDir whether this file is a directory.
  * @param modificationTime the file last modification time.
@@ -46,17 +45,23 @@ case class SinkFileStatus(
     blockReplication: Int,
     blockSize: Long,
     action: String) {
+  def sparkPath: SparkPath = SparkPath.fromPathString(path)
 
   def toFileStatus: FileStatus = {
     new FileStatus(
-      size, isDir, blockReplication, blockSize, modificationTime, new Path(new 
URI(path)))
+      size,
+      isDir,
+      blockReplication,
+      blockSize,
+      modificationTime,
+      SparkPath.fromUrlString(path).toPath)
   }
 }
 
 object SinkFileStatus {
   def apply(f: FileStatus): SinkFileStatus = {
     SinkFileStatus(
-      path = f.getPath.toUri.toString,
+      path = SparkPath.fromPath(f.getPath).urlEncoded,
       size = f.getLen,
       isDir = f.isDirectory,
       modificationTime = f.getModificationTime,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 5baf3d29a49..6eb2ffef44e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.net.URI
 import java.util.concurrent.ThreadPoolExecutor
 import java.util.concurrent.TimeUnit._
 
@@ -28,6 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, 
GlobFilter, Path}
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
+import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.read.streaming
@@ -109,16 +109,16 @@ class FileStreamSource(
   // Visible for testing and debugging in production.
   val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
-  private var allFilesForTriggerAvailableNow: Seq[(String, Long)] = _
+  private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
 
   metadataLog.restore().foreach { entry =>
-    seenFiles.add(entry.path, entry.timestamp)
+    seenFiles.add(entry.sparkPath, entry.timestamp)
   }
   seenFiles.purge()
 
   logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = 
$maxFileAgeMs")
 
-  private var unreadFiles: Seq[(String, Long)] = _
+  private var unreadFiles: Seq[(SparkPath, Long)] = _
 
   /**
    * Returns the maximum offset that can be retrieved from the source.
@@ -193,7 +193,7 @@ class FileStreamSource(
       metadataLogCurrentOffset += 1
 
       val fileEntries = batchFiles.map { case (p, timestamp) =>
-        FileEntry(path = p, timestamp = timestamp, batchId = 
metadataLogCurrentOffset)
+        FileEntry(path = p.urlEncoded, timestamp = timestamp, batchId = 
metadataLogCurrentOffset)
       }.toArray
       if (metadataLog.add(metadataLogCurrentOffset, fileEntries)) {
         logInfo(s"Log offset set to $metadataLogCurrentOffset with 
${batchFiles.size} new files")
@@ -239,7 +239,7 @@ class FileStreamSource(
     val newDataSource =
       DataSource(
         sparkSession,
-        paths = files.map(f => new Path(new URI(f.path)).toString),
+        paths = files.map(_.sparkPath.toPath.toString),
         userSpecifiedSchema = Some(schema),
         partitionColumns = partitionColumns,
         className = fileFormatClassName,
@@ -286,7 +286,7 @@ class FileStreamSource(
   /**
    * Returns a list of files found, sorted by their timestamp.
    */
-  private def fetchAllFiles(): Seq[(String, Long)] = {
+  private def fetchAllFiles(): Seq[(SparkPath, Long)] = {
     val startTime = System.nanoTime
 
     var allFiles: Seq[FileStatus] = null
@@ -318,7 +318,7 @@ class FileStreamSource(
     }
 
     val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { 
status =>
-      (status.getPath.toUri.toString, status.getModificationTime)
+      (SparkPath.fromFileStatus(status), status.getModificationTime)
     }
     val endTime = System.nanoTime
     val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime)
@@ -368,7 +368,12 @@ object FileStreamSource {
   val DISCARD_UNSEEN_FILES_RATIO = 0.2
   val MAX_CACHED_UNSEEN_FILES = 10000
 
-  case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) 
extends Serializable
+  case class FileEntry(
+      path: String, // uri-encoded path string
+      timestamp: Timestamp,
+      batchId: Long) extends Serializable {
+    def sparkPath: SparkPath = SparkPath.fromUrlString(path)
+  }
 
   /**
    * A custom hash map used to track the list of files seen. This map is not 
thread-safe.
@@ -388,12 +393,12 @@ object FileStreamSource {
     /** Timestamp for the last purge operation. */
     private var lastPurgeTimestamp: Timestamp = 0L
 
-    @inline private def stripPathIfNecessary(path: String) = {
-      if (fileNameOnly) new Path(new URI(path)).getName else path
+    @inline private def stripPathIfNecessary(path: SparkPath) = {
+      if (fileNameOnly) path.toPath.getName else path.urlEncoded
     }
 
     /** Add a new file to the map. */
-    def add(path: String, timestamp: Timestamp): Unit = {
+    def add(path: SparkPath, timestamp: Timestamp): Unit = {
       map.put(stripPathIfNecessary(path), timestamp)
       if (timestamp > latestTimestamp) {
         latestTimestamp = timestamp
@@ -404,7 +409,7 @@ object FileStreamSource {
      * Returns true if we should consider this file a new file. The file is 
only considered "new"
      * if it is new enough that we are still tracking, and we have not seen it 
before.
      */
-    def isNewFile(path: String, timestamp: Timestamp): Boolean = {
+    def isNewFile(path: SparkPath, timestamp: Timestamp): Boolean = {
       // Note that we are testing against lastPurgeTimestamp here so we'd 
never miss a file that
       // is older than (latestTimestamp - maxAgeMs) but has not been purged 
yet.
       timestamp >= lastPurgeTimestamp && 
!map.containsKey(stripPathIfNecessary(path))
@@ -551,7 +556,7 @@ object FileStreamSource {
     }
 
     override protected def cleanTask(entry: FileEntry): Unit = {
-      val curPath = new Path(new URI(entry.path))
+      val curPath = entry.sparkPath.toPath
       val newPath = new Path(baseArchivePath.toString.stripSuffix("/") + 
curPath.toUri.getPath)
 
       try {
@@ -575,7 +580,7 @@ object FileStreamSource {
     extends FileStreamSourceCleaner with Logging {
 
     override protected def cleanTask(entry: FileEntry): Unit = {
-      val curPath = new Path(new URI(entry.path))
+      val curPath = entry.sparkPath.toPath
       try {
         logDebug(s"Removing completed file $curPath")
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 3b81d215c7f..474de0dacae 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -881,7 +881,8 @@ class FileBasedDataSourceSuite extends QueryTest
           assert(fileScan.get.dataFilters.nonEmpty)
           assert(fileScan.get.planInputPartitions().forall { partition =>
             partition.asInstanceOf[FilePartition].files.forall { file =>
-              file.filePath.contains("p1=1") && file.filePath.contains("p2=2")
+              file.urlEncodedPath.contains("p1=1") &&
+                file.urlEncodedPath.contains("p2=2")
             }
           })
           checkAnswer(df, Row("b", 1, 2))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 86a0c4d1799..2d7cd007bee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -1461,7 +1461,7 @@ class SubquerySuite extends QueryTest
           partitionFilters.exists(ExecSubqueryExpression.hasSubquery) &&
             fs.inputRDDs().forall(
               _.asInstanceOf[FileScanRDD].filePartitions.forall(
-                _.files.forall(_.filePath.contains("p=0"))))
+                _.files.forall(_.urlEncodedPath.contains("p=0"))))
         case _ => false
       })
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 4dbe619610e..26655c2d95a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, 
RawLocalFileSystem
 import org.apache.hadoop.mapreduce.Job
 
 import org.apache.spark.SparkException
+import org.apache.spark.paths.SparkPath.{fromUrlString => sp}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -283,10 +284,10 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSparkSession {
 
   test("Locality support for FileScanRDD") {
     val partition = FilePartition(0, Array(
-      PartitionedFile(InternalRow.empty, "fakePath0", 0, 10, Array("host0", 
"host1")),
-      PartitionedFile(InternalRow.empty, "fakePath0", 10, 20, Array("host1", 
"host2")),
-      PartitionedFile(InternalRow.empty, "fakePath1", 0, 5, Array("host3")),
-      PartitionedFile(InternalRow.empty, "fakePath2", 0, 5, Array("host4"))
+      PartitionedFile(InternalRow.empty, sp("fakePath0"), 0, 10, 
Array("host0", "host1")),
+      PartitionedFile(InternalRow.empty, sp("fakePath0"), 10, 20, 
Array("host1", "host2")),
+      PartitionedFile(InternalRow.empty, sp("fakePath1"), 0, 5, 
Array("host3")),
+      PartitionedFile(InternalRow.empty, sp("fakePath2"), 0, 5, Array("host4"))
     ))
 
     val fakeRDD = new FileScanRDD(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
index 771ddbd6523..b6b89ab3043 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
@@ -23,6 +23,7 @@ import java.nio.file.Files
 
 import org.apache.hadoop.conf.Configuration
 
+import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.test.SharedSparkSession
 
@@ -37,7 +38,11 @@ class HadoopFileLinesReaderSuite extends SharedSparkSession {
     Files.write(path.toPath, text.getBytes(StandardCharsets.UTF_8))
 
     val lines = ranges.flatMap { case (start, length) =>
-      val file = PartitionedFile(InternalRow.empty, path.getCanonicalPath, 
start, length)
+      val file = PartitionedFile(
+        InternalRow.empty,
+        SparkPath.fromPathString(path.getCanonicalPath),
+        start,
+        length)
       val hadoopConf = conf.getOrElse(spark.sessionState.newHadoopConf())
       val reader = new HadoopFileLinesReader(file, delimOpt, hadoopConf)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index 9a374d5c302..1d2e467c94c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -278,7 +278,7 @@ class BinaryFileFormatSuite extends QueryTest with 
SharedSparkSession {
         options = Map.empty,
         hadoopConf = spark.sessionState.newHadoopConf())
       val partitionedFile = mock(classOf[PartitionedFile])
-      when(partitionedFile.filePath).thenReturn(fileStatus.getPath.toString)
+      when(partitionedFile.toPath).thenReturn(fileStatus.getPath)
       assert(reader(partitionedFile).nonEmpty === expected,
         s"Filters $filters applied to $fileStatus should be $expected.")
     }
@@ -305,7 +305,7 @@ class BinaryFileFormatSuite extends QueryTest with 
SharedSparkSession {
       hadoopConf = spark.sessionState.newHadoopConf()
     )
     val partitionedFile = mock(classOf[PartitionedFile])
-    when(partitionedFile.filePath).thenReturn(file.getPath)
+    when(partitionedFile.toPath).thenReturn(new Path(file.toURI))
     val encoder = RowEncoder(requiredSchema).resolveAndBind()
     encoder.createDeserializer().apply(reader(partitionedFile).next())
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 568b1df4c40..8c31d3c7abf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.JobContext
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.paths.SparkPath
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.{AnalysisException, DataFrame}
 import org.apache.spark.sql.catalyst.util.stringToFile
@@ -519,7 +520,7 @@ abstract class FileStreamSinkSuite extends StreamTest {
           .filter(_.toString.endsWith(".parquet"))
           .map(_.getFileName.toString)
           .toSet
-        val trackingFileNames = tracking.map(new Path(_).getName).toSet
+        val trackingFileNames = 
tracking.map(SparkPath.fromUrlString(_).toPath.getName).toSet
 
         // there would be possible to have race condition:
         // - some tasks complete while abortJob is being called
@@ -569,7 +570,7 @@ abstract class FileStreamSinkSuite extends StreamTest {
           val allFiles = sinkLog.allFiles()
           // only files from non-empty partition should be logged
           assert(allFiles.length < 10)
-          assert(allFiles.forall(file => fs.exists(new Path(file.path))))
+          assert(allFiles.forall(file => fs.exists(file.sparkPath.toPath)))
 
           // the query should be able to read all rows correctly with metadata 
log
           val outputDf = 
spark.read.format(format).load(outputDir.getCanonicalPath)
@@ -709,14 +710,14 @@ class FileStreamSinkV1Suite extends FileStreamSinkSuite {
     // Read with pruning, should read only files in partition dir id=1
     checkFileScanPartitions(df.filter("id = 1")) { partitions =>
       val filesToBeRead = partitions.flatMap(_.files)
-      assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/")))
+      assert(filesToBeRead.map(_.urlEncodedPath).forall(_.contains("/id=1/")))
       assert(filesToBeRead.map(_.partitionValues).distinct.size === 1)
     }
 
     // Read with pruning, should read only files in partition dir id=1 and id=2
     checkFileScanPartitions(df.filter("id in (1,2)")) { partitions =>
       val filesToBeRead = partitions.flatMap(_.files)
-      assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/")))
+      assert(!filesToBeRead.map(_.urlEncodedPath).exists(_.contains("/id=3/")))
       assert(filesToBeRead.map(_.partitionValues).distinct.size === 2)
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 9b1e5a9e16e..a8a4df2ad04 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -34,6 +34,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.paths.SparkPath.{fromUrlString => sp}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.connector.read.streaming.ReadLimit
@@ -1761,69 +1762,69 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
   test("SeenFilesMap") {
     val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false)
 
-    map.add("a", 5)
+    map.add(sp("a"), 5)
     assert(map.size == 1)
     map.purge()
     assert(map.size == 1)
 
     // Add a new entry and purge should be no-op, since the gap is exactly 10 
ms.
-    map.add("b", 15)
+    map.add(sp("b"), 15)
     assert(map.size == 2)
     map.purge()
     assert(map.size == 2)
 
     // Add a new entry that's more than 10 ms than the first entry. We should 
be able to purge now.
-    map.add("c", 16)
+    map.add(sp("c"), 16)
     assert(map.size == 3)
     map.purge()
     assert(map.size == 2)
 
     // Override existing entry shouldn't change the size
-    map.add("c", 25)
+    map.add(sp("c"), 25)
     assert(map.size == 2)
 
     // Not a new file because we have seen c before
-    assert(!map.isNewFile("c", 20))
+    assert(!map.isNewFile(sp("c"), 20))
 
     // Not a new file because timestamp is too old
-    assert(!map.isNewFile("d", 5))
+    assert(!map.isNewFile(sp("d"), 5))
 
     // Finally a new file: never seen and not too old
-    assert(map.isNewFile("e", 20))
+    assert(map.isNewFile(sp("e"), 20))
   }
 
   test("SeenFilesMap with fileNameOnly = true") {
     val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = true)
 
-    map.add("file:///a/b/c/d", 5)
-    map.add("file:///a/b/c/e", 5)
+    map.add(sp("file:///a/b/c/d"), 5)
+    map.add(sp("file:///a/b/c/e"), 5)
     assert(map.size === 2)
 
-    assert(!map.isNewFile("d", 5))
-    assert(!map.isNewFile("file:///d", 5))
-    assert(!map.isNewFile("file:///x/d", 5))
-    assert(!map.isNewFile("file:///x/y/d", 5))
+    assert(!map.isNewFile(sp("d"), 5))
+    assert(!map.isNewFile(sp("file:///d"), 5))
+    assert(!map.isNewFile(sp("file:///x/d"), 5))
+    assert(!map.isNewFile(sp("file:///x/y/d"), 5))
 
-    map.add("s3:///bucket/d", 5)
-    map.add("s3n:///bucket/d", 5)
-    map.add("s3a:///bucket/d", 5)
+    map.add(sp("s3:///bucket/d"), 5)
+    map.add(sp("s3n:///bucket/d"), 5)
+    map.add(sp("s3a:///bucket/d"), 5)
     assert(map.size === 2)
   }
 
   test("SeenFilesMap should only consider a file old if it is earlier than 
last purge time") {
     val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false)
 
-    map.add("a", 20)
+    map.add(sp("a"), 20)
     assert(map.size == 1)
 
     // Timestamp 5 should still considered a new file because purge time 
should be 0
-    assert(map.isNewFile("b", 9))
-    assert(map.isNewFile("b", 10))
+    assert(map.isNewFile(sp("b"), 9))
+    assert(map.isNewFile(sp("b"), 10))
 
     // Once purge, purge time should be 10 and then b would be a old file if 
it is less than 10.
     map.purge()
-    assert(!map.isNewFile("b", 9))
-    assert(map.isNewFile("b", 10))
+    assert(!map.isNewFile(sp("b"), 9))
+    assert(map.isNewFile(sp("b"), 10))
   }
 
   test("do not recheck that files exist during getBatch") {
@@ -2197,7 +2198,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
           val files = 
metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
           assert(files.forall(_.batchId == batchId))
 
-          val actualInputFiles = files.map { p => new 
Path(p.path).toUri.getPath }
+          val actualInputFiles = files.map { p => p.sparkPath.toUri.getPath }
           val expectedInputFiles = inputFiles.slice(batchId.toInt * 10, 
batchId.toInt * 10 + 10)
             .map(_.getCanonicalPath)
           assert(actualInputFiles === expectedInputFiles)
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index aff014261ba..a9314397dcf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.hive.orc
 
-import java.net.URI
 import java.util.Properties
 
 import scala.collection.JavaConverters._
@@ -152,7 +151,7 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
     (file: PartitionedFile) => {
       val conf = broadcastedHadoopConf.value.value
 
-      val filePath = new Path(new URI(file.filePath))
+      val filePath = file.toPath
 
       // SPARK-8501: Empty ORC files always have an empty schema stored in 
their footer. In this
       // case, `OrcFileOperator.readSchema` returns `None`, and we can't read 
the underlying file
@@ -166,7 +165,7 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
 
         val orcRecordReader = {
           val job = Job.getInstance(conf)
-          FileInputFormat.setInputPaths(job, file.filePath)
+          FileInputFormat.setInputPaths(job, file.urlEncodedPath)
 
           // Custom OrcRecordReader is used to get
           // ObjectInspector during recordReader creation itself and can


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to