This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new faedcd91d55 [SPARK-41970] Introduce SparkPath for typesafety faedcd91d55 is described below commit faedcd91d554a00fc76116a0c188752cf036f907 Author: David Lewis <david.le...@databricks.com> AuthorDate: Thu Jan 19 10:05:51 2023 +0800 [SPARK-41970] Introduce SparkPath for typesafety ### What changes were proposed in this pull request? This PR proposes a strongly typed `SparkPath` that encapsulates a url-encoded string. It has helper methods for creating hadoop paths, uris, and uri-encoded strings. The intent is to identify and fix various bugs in the way that Spark handles these paths. To do this we introduced the SparkPath type to `PartitionFile` (a widely used class), and then started fixing compile errors. In doing so we fixed various bugs. ### Why are the changes needed? Given `val str = "s3://bucket/path with space/a"` There is a difference between `new Path(str)` and `new Path(new URI(str))`, and thus a difference between `new URI(str)` and `new Path(str).toUri`. Both `URI` and `Path` are symmetric in construction and `toString`, but are not interchangeable. Spark confuses these two paths (uri-encoded vs not). This PR attempts to use types to disambiguate them. ### Does this PR introduce _any_ user-facing change? This PR proposes changing the public API of `PartitionedFile`, and various other methods in the name of type safety. It needs to be clear to callers of an API what type of path string is expected. ### How was this patch tested? We rely on existing tests, and update the default temp path creation to include paths with spaces. Closes #39488 from databricks-david-lewis/SPARK_PATH. Authored-by: David Lewis <david.le...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/avro/AvroFileFormat.scala | 5 +- .../sql/v2/avro/AvroPartitionReaderFactory.scala | 7 +-- .../apache/spark/sql/avro/AvroRowReaderSuite.scala | 6 +-- .../org/apache/spark/sql/avro/AvroSuite.scala | 3 +- .../apache/spark/deploy/worker/WorkerWatcher.scala | 2 +- .../scala/org/apache/spark/paths/SparkPath.scala | 55 ++++++++++++++++++++++ .../scala/org/apache/spark/rpc/RpcAddress.scala | 2 +- .../spark/ml/source/image/ImageFileFormat.scala | 6 +-- scalastyle-config.xml | 8 ++++ .../main/scala/org/apache/spark/sql/Dataset.scala | 7 +-- .../spark/sql/execution/DataSourceScanExec.scala | 4 +- .../apache/spark/sql/execution/FileRelation.scala | 4 +- .../spark/sql/execution/PartitionedFileUtil.scala | 5 +- .../execution/datasources/CatalogFileIndex.scala | 3 +- .../sql/execution/datasources/DataSource.scala | 3 +- .../sql/execution/datasources/FileIndex.scala | 3 +- .../sql/execution/datasources/FileScanRDD.scala | 21 ++++++--- .../datasources/HadoopFileLinesReader.scala | 4 +- .../datasources/HadoopFileWholeTextReader.scala | 4 +- .../execution/datasources/HadoopFsRelation.scala | 3 +- .../datasources/PartitioningAwareFileIndex.scala | 5 +- .../datasources/binaryfile/BinaryFileFormat.scala | 3 +- .../execution/datasources/csv/CSVDataSource.scala | 3 +- .../execution/datasources/csv/CSVFileFormat.scala | 2 +- .../datasources/json/JsonDataSource.scala | 5 +- .../execution/datasources/orc/OrcFileFormat.scala | 3 +- .../datasources/parquet/ParquetFileFormat.scala | 4 +- .../datasources/v2/FilePartitionReader.scala | 8 ++-- .../sql/execution/datasources/v2/FileScan.scala | 2 +- .../v2/csv/CSVPartitionReaderFactory.scala | 2 +- .../v2/orc/OrcPartitionReaderFactory.scala | 10 ++-- .../v2/parquet/ParquetPartitionReaderFactory.scala | 11 ++--- .../execution/streaming/FileStreamSinkLog.scala | 17 ++++--- .../sql/execution/streaming/FileStreamSource.scala | 35 ++++++++------ .../spark/sql/FileBasedDataSourceSuite.scala | 3 +- .../scala/org/apache/spark/sql/SubquerySuite.scala | 2 +- .../datasources/FileSourceStrategySuite.scala | 9 ++-- .../datasources/HadoopFileLinesReaderSuite.scala | 7 ++- .../binaryfile/BinaryFileFormatSuite.scala | 4 +- .../spark/sql/streaming/FileStreamSinkSuite.scala | 9 ++-- .../sql/streaming/FileStreamSourceSuite.scala | 45 +++++++++--------- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 5 +- 42 files changed, 216 insertions(+), 133 deletions(-) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala index a13e0624f35..3e16e121081 100755 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.avro import java.io._ -import java.net.URI import scala.util.control.NonFatal @@ -96,9 +95,9 @@ private[sql] class AvroFileFormat extends FileFormat // Doing input file filtering is improper because we may generate empty tasks that process no // input files but stress the scheduler. We should probably add a more general input file // filtering mechanism for `FileFormat` data sources. See SPARK-16317. - if (parsedOptions.ignoreExtension || file.filePath.endsWith(".avro")) { + if (parsedOptions.ignoreExtension || file.urlEncodedPath.endsWith(".avro")) { val reader = { - val in = new FsInput(new Path(new URI(file.filePath)), conf) + val in = new FsInput(file.toPath, conf) try { val datumReader = userProvidedSchema match { case Some(userSchema) => new GenericDatumReader[GenericRecord](userSchema) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala index 3ad63f113fe..cc7bd180e84 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala @@ -16,14 +16,11 @@ */ package org.apache.spark.sql.v2.avro -import java.net.URI - import scala.util.control.NonFatal import org.apache.avro.file.DataFileReader import org.apache.avro.generic.{GenericDatumReader, GenericRecord} import org.apache.avro.mapred.FsInput -import org.apache.hadoop.fs.Path import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast @@ -62,9 +59,9 @@ case class AvroPartitionReaderFactory( val conf = broadcastedConf.value.value val userProvidedSchema = options.schema - if (options.ignoreExtension || partitionedFile.filePath.endsWith(".avro")) { + if (options.ignoreExtension || partitionedFile.urlEncodedPath.endsWith(".avro")) { val reader = { - val in = new FsInput(new Path(new URI(partitionedFile.filePath)), conf) + val in = new FsInput(partitionedFile.toPath, conf) try { val datumReader = userProvidedSchema match { case Some(userSchema) => new GenericDatumReader[GenericRecord](userSchema) diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala index 53064371b2a..15b1e5ecf88 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroRowReaderSuite.scala @@ -18,13 +18,11 @@ package org.apache.spark.sql.avro import java.io._ -import java.net.URI import org.apache.avro.file.DataFileReader import org.apache.avro.generic.{GenericDatumReader, GenericRecord} import org.apache.avro.mapred.FsInput import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf import org.apache.spark.sql._ @@ -62,8 +60,8 @@ class AvroRowReaderSuite case BatchScanExec(_, f: AvroScan, _, _, _, _, _) => f } val filePath = fileScan.get.fileIndex.inputFiles(0) - val fileSize = new File(new URI(filePath)).length - val in = new FsInput(new Path(new URI(filePath)), new Configuration()) + val fileSize = new File(filePath.toUri).length + val in = new FsInput(filePath.toPath, new Configuration()) val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()) val it = new Iterator[InternalRow] with AvroUtils.RowReader { diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index f74274b0a3c..d4e85addf95 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -2357,7 +2357,8 @@ class AvroV2Suite extends AvroSuite with ExplainSuiteHelper { assert(fileScan.get.dataFilters.nonEmpty) assert(fileScan.get.planInputPartitions().forall { partition => partition.asInstanceOf[FilePartition].files.forall { file => - file.filePath.contains("p1=1") && file.filePath.contains("p2=2") + file.urlEncodedPath.contains("p1=1") && + file.urlEncodedPath.contains("p2=2") } }) checkAnswer(df, Row("b", 1, 2)) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala index b7a5728dd00..deb5bb1a697 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala @@ -47,7 +47,7 @@ private[spark] class WorkerWatcher( private[deploy] var isShutDown = false // Lets filter events only from the worker's rpc system - private val expectedAddress = RpcAddress.fromURIString(workerUrl) + private val expectedAddress = RpcAddress.fromUrlString(workerUrl) private def isWorker(address: RpcAddress) = expectedAddress == address private def exitNonZero() = diff --git a/core/src/main/scala/org/apache/spark/paths/SparkPath.scala b/core/src/main/scala/org/apache/spark/paths/SparkPath.scala new file mode 100644 index 00000000000..5bc6233f6cf --- /dev/null +++ b/core/src/main/scala/org/apache/spark/paths/SparkPath.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.paths + +import java.net.URI + +import org.apache.hadoop.fs.{FileStatus, Path} + +/** + * A canonical representation of a file path. This class is intended to provide + * type-safety to the way that Spark handles Paths. Paths can be represented as + * Strings in multiple ways, which are not always compatible. Spark regularly uses + * two ways: 1. hadoop Path.toString and java URI.toString. + */ +case class SparkPath private (private val underlying: String) { + def urlEncoded: String = underlying + def toUri: URI = new URI(underlying) + def toPath: Path = new Path(toUri) + override def toString: String = underlying +} + +object SparkPath { + /** + * Creates a SparkPath from a hadoop Path string. + * Please be very sure that the provided string is encoded (or not encoded) in the right way. + * + * Please see the hadoop Path documentation here: + * https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/Path.html#Path-java.lang.String- + */ + def fromPathString(str: String): SparkPath = fromPath(new Path(str)) + def fromPath(path: Path): SparkPath = fromUri(path.toUri) + def fromFileStatus(fs: FileStatus): SparkPath = fromPath(fs.getPath) + + /** + * Creates a SparkPath from a url-encoded string. + * Note: It is the responsibility of the caller to ensure that str is a valid url-encoded string. + */ + def fromUrlString(str: String): SparkPath = SparkPath(str) + def fromUri(uri: URI): SparkPath = fromUrlString(uri.toString) +} diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala index 9b0739c9447..675dc24206a 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcAddress.scala @@ -39,7 +39,7 @@ private[spark] case class RpcAddress(_host: String, port: Int) { private[spark] object RpcAddress { /** Return the [[RpcAddress]] represented by `uri`. */ - def fromURIString(uri: String): RpcAddress = { + def fromUrlString(uri: String): RpcAddress = { val uriObj = new java.net.URI(uri) RpcAddress(uriObj.getHost, uriObj.getPort) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala b/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala index 0995df51c64..206ce6f0675 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.source.image import com.google.common.io.{ByteStreams, Closeables} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.mapreduce.Job import org.apache.spark.ml.image.ImageSchema @@ -71,8 +71,8 @@ private[image] class ImageFileFormat extends FileFormat with DataSourceRegister if (!imageSourceOptions.dropInvalid && requiredSchema.isEmpty) { Iterator(emptyUnsafeRow) } else { - val origin = file.filePath - val path = new Path(origin) + val origin = file.urlEncodedPath + val path = file.toPath val fs = path.getFileSystem(broadcastedHadoopConf.value.value) val stream = fs.open(path) val bytes = try { diff --git a/scalastyle-config.xml b/scalastyle-config.xml index f34b5d55e42..3dcb03b13fd 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -437,4 +437,12 @@ This file is divided into 3 sections: Use org.apache.spark.util.Utils.createTempDir instead. </customMessage> </check> + + <check customId="pathfromuri" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters><parameter name="regex">new Path\(new URI\(</parameter></parameters> + <customMessage><![CDATA[ + Are you sure that this string is uri encoded? Please be careful when converting hadoop Paths + and URIs to and from String. If possible, please use SparkPath. + ]]></customMessage> + </check> </scalastyle> diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c8e2a48859d..88c4fe511a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -34,6 +34,7 @@ import org.apache.spark.api.java.function._ import org.apache.spark.api.python.{PythonRDD, SerDeUtil} import org.apache.spark.api.r.RRDD import org.apache.spark.broadcast.Broadcast +import org.apache.spark.paths.SparkPath import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QueryPlanningTracker, ScalaReflection, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ @@ -3924,18 +3925,18 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ def inputFiles: Array[String] = { - val files: Seq[String] = queryExecution.optimizedPlan.collect { + val files: Seq[SparkPath] = queryExecution.optimizedPlan.collect { case LogicalRelation(fsBasedRelation: FileRelation, _, _, _) => fsBasedRelation.inputFiles case fr: FileRelation => fr.inputFiles case r: HiveTableRelation => - r.tableMeta.storage.locationUri.map(_.toString).toArray + r.tableMeta.storage.locationUri.map(SparkPath.fromUri).toArray case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _) => table.fileIndex.inputFiles }.flatten - files.toSet.toArray + files.iterator.map(_.urlEncoded).toSet.toArray } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 8dda88e86c0..0f4b8c563d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -632,8 +632,8 @@ case class FileSourceScanExec( } }.groupBy { f => BucketingUtils - .getBucketId(new Path(f.filePath).getName) - .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath)) + .getBucketId(f.toPath.getName) + .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.urlEncodedPath)) } val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala index a299fed7fd1..6a832b784fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import org.apache.spark.paths.SparkPath + /** * An interface for relations that are backed by files. When a class implements this interface, * the list of paths that it returns will be returned to a user who calls `inputPaths` on any @@ -24,5 +26,5 @@ package org.apache.spark.sql.execution */ trait FileRelation { /** Returns the list of files that will be read when scanning this relation. */ - def inputFiles: Array[String] + def inputFiles: Array[SparkPath] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala index 4cccd4132e9..fd5f2f25c0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} +import org.apache.spark.paths.SparkPath import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources._ @@ -36,7 +37,7 @@ object PartitionedFileUtil { val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining val hosts = getBlockHosts(getBlockLocations(file), offset, size) - PartitionedFile(partitionValues, filePath.toUri.toString, offset, size, hosts, + PartitionedFile(partitionValues, SparkPath.fromPath(filePath), offset, size, hosts, file.getModificationTime, file.getLen) } } else { @@ -49,7 +50,7 @@ object PartitionedFileUtil { filePath: Path, partitionValues: InternalRow): PartitionedFile = { val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen) - PartitionedFile(partitionValues, filePath.toUri.toString, 0, file.getLen, hosts, + PartitionedFile(partitionValues, SparkPath.fromPath(filePath), 0, file.getLen, hosts, file.getModificationTime, file.getLen) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 727b33018fb..f12b72f6867 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -22,6 +22,7 @@ import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.spark.paths.SparkPath import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.expressions._ @@ -94,7 +95,7 @@ class CatalogFileIndex( } } - override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles + override def inputFiles: Array[SparkPath] = filterPartitions(Nil).inputFiles // `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ad26ee21c2c..94dd3bc0bd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -70,7 +70,8 @@ import org.apache.spark.util.{HadoopFSUtils, ThreadUtils, Utils} * * @param paths A list of file system paths that hold data. These will be globbed before if * the "__globPaths__" option is true, and will be qualified. This option only works - * when reading from a [[FileFormat]]. + * when reading from a [[FileFormat]]. These paths are expected to be hadoop [[Path]] + * strings. * @param userSpecifiedSchema An optional specification of the schema of the data. When present * we skip attempting to infer the schema. * @param partitionColumns A list of column names that the relation is partitioned by. This list is diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala index 7bfc781797e..d9a63edca73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.fs._ +import org.apache.spark.paths.SparkPath import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.StructType @@ -62,7 +63,7 @@ trait FileIndex { * Returns the list of files that will be read when scanning this relation. This call may be * very expensive for large tables. */ - def inputFiles: Array[String] + def inputFiles: Array[SparkPath] /** Refresh any cached file listings */ def refresh(): Unit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 827d41dd096..0ccf72823f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources import java.io.{Closeable, FileNotFoundException, IOException} +import java.net.URI import scala.util.control.NonFatal @@ -25,6 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{Partition => RDDPartition, SparkUpgradeException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.paths.SparkPath import org.apache.spark.rdd.{InputFileBlockHolder, RDD} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{FileSourceOptions, InternalRow} @@ -51,12 +53,17 @@ import org.apache.spark.util.NextIterator */ case class PartitionedFile( partitionValues: InternalRow, - filePath: String, + filePath: SparkPath, start: Long, length: Long, @transient locations: Array[String] = Array.empty, modificationTime: Long = 0L, fileSize: Long = 0L) { + + def pathUri: URI = filePath.toUri + def toPath: Path = filePath.toPath + def urlEncodedPath: String = filePath.urlEncoded + override def toString: String = { s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues" } @@ -140,14 +147,14 @@ class FileScanRDD( private def updateMetadataRow(): Unit = if (metadataColumns.nonEmpty && currentFile != null) { updateMetadataInternalRow(metadataRow, metadataColumns.map(_.name), - new Path(currentFile.filePath), currentFile.fileSize, currentFile.modificationTime) + currentFile.toPath, currentFile.fileSize, currentFile.modificationTime) } /** * Create an array of constant column vectors containing all required metadata columns */ private def createMetadataColumnVector(c: ColumnarBatch): Array[ColumnVector] = { - val path = new Path(currentFile.filePath) + val path = currentFile.toPath metadataColumns.map(_.name).map { case FILE_PATH => val columnVector = new ConstantColumnVector(c.numRows(), StringType) @@ -223,7 +230,8 @@ class FileScanRDD( updateMetadataRow() logInfo(s"Reading File $currentFile") // Sets InputFileBlockHolder for the file block's information - InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) + InputFileBlockHolder + .set(currentFile.urlEncodedPath, currentFile.start, currentFile.length) resetCurrentIterator() if (ignoreMissingFiles || ignoreCorruptFiles) { @@ -278,12 +286,13 @@ class FileScanRDD( } catch { case e: SchemaColumnConvertNotSupportedException => throw QueryExecutionErrors.unsupportedSchemaColumnConvertError( - currentFile.filePath, e.getColumn, e.getLogicalType, e.getPhysicalType, e) + currentFile.urlEncodedPath, e.getColumn, e.getLogicalType, e.getPhysicalType, e) case sue: SparkUpgradeException => throw sue case NonFatal(e) => e.getCause match { case sue: SparkUpgradeException => throw sue - case _ => throw QueryExecutionErrors.cannotReadFilesError(e, currentFile.filePath) + case _ => + throw QueryExecutionErrors.cannotReadFilesError(e, currentFile.urlEncodedPath) } } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala index b5e276bd421..5ec17290c37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReader.scala @@ -18,10 +18,8 @@ package org.apache.spark.sql.execution.datasources import java.io.Closeable -import java.net.URI import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileSplit, LineRecordReader} @@ -48,7 +46,7 @@ class HadoopFileLinesReader( private val _iterator = { val fileSplit = new FileSplit( - new Path(new URI(file.filePath)), + file.toPath, file.start, file.length, // The locality is decided by `getPreferredLocations` in `FileScanRDD`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala index a48001f04a9..17649f62d84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFileWholeTextReader.scala @@ -18,10 +18,8 @@ package org.apache.spark.sql.execution.datasources import java.io.Closeable -import java.net.URI import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit @@ -37,7 +35,7 @@ class HadoopFileWholeTextReader(file: PartitionedFile, conf: Configuration) extends Iterator[Text] with Closeable { private val _iterator = { val fileSplit = new CombineFileSplit( - Array(new Path(new URI(file.filePath))), + Array(file.toPath), Array(file.start), Array(file.length), // The locality is decided by `getPreferredLocations` in `FileScanRDD`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index fd1824055dc..bd04ddb2ec6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import org.apache.spark.paths.SparkPath import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.execution.FileRelation @@ -70,5 +71,5 @@ case class HadoopFsRelation( } - override def inputFiles: Array[String] = location.inputFiles + override def inputFiles: Array[SparkPath] = location.inputFiles } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 137fd6fe1ac..2d8c7b19507 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.internal.Logging +import org.apache.spark.paths.SparkPath import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions._ @@ -136,8 +137,8 @@ abstract class PartitioningAwareFileIndex( } /** Returns the list of files that will be read when scanning this relation. */ - override def inputFiles: Array[String] = - allFiles().map(_.getPath.toUri.toString).toArray + override def inputFiles: Array[SparkPath] = + allFiles().map(SparkPath.fromFileStatus).toArray override def sizeInBytes: Long = allFiles().map(_.getLen).sum diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala index 43512ff5ac8..ba6d351761e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.binaryfile -import java.net.URI import java.sql.Timestamp import com.google.common.io.{ByteStreams, Closeables} @@ -101,7 +100,7 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister { val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH) file: PartitionedFile => { - val path = new Path(new URI(file.filePath)) + val path = file.toPath val fs = path.getFileSystem(broadcastedHadoopConf.value.value) val status = fs.getFileStatus(path) if (filterFuncs.forall(_.apply(status))) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index d8fa768a604..99d43953c4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.csv -import java.net.URI import java.nio.charset.{Charset, StandardCharsets} import com.univocity.parsers.csv.CsvParser @@ -179,7 +178,7 @@ object MultiLineCSVDataSource extends CSVDataSource { headerChecker: CSVHeaderChecker, requiredSchema: StructType): Iterator[InternalRow] = { UnivocityParser.parseStream( - CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))), + CodecStreams.createInputStreamWithCloseResource(conf, file.toPath), parser, headerChecker, requiredSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 93679516a8c..2a6c209ff0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -128,7 +128,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { val schema = if (columnPruning) actualRequiredSchema else actualDataSchema val isStartOfFile = file.start == 0 val headerChecker = new CSVHeaderChecker( - schema, parsedOptions, source = s"CSV file: ${file.filePath}", isStartOfFile) + schema, parsedOptions, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile) CSVDataSource(parsedOptions).readFile( conf, file, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 2f4cd468457..7c98c31bba2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources.json import java.io.InputStream -import java.net.URI import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import com.google.common.io.ByteStreams @@ -211,7 +210,7 @@ object MultiLineJsonDataSource extends JsonDataSource { schema: StructType): Iterator[InternalRow] = { def partitionedFileString(ignored: Any): UTF8String = { Utils.tryWithResource { - CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))) + CodecStreams.createInputStreamWithCloseResource(conf, file.toPath) } { inputStream => UTF8String.fromBytes(ByteStreams.toByteArray(inputStream)) } @@ -227,6 +226,6 @@ object MultiLineJsonDataSource extends JsonDataSource { parser.options.columnNameOfCorruptRecord) safeParser.parse( - CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))) + CodecStreams.createInputStreamWithCloseResource(conf, file.toPath)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 6a58513c346..cb18566e848 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.datasources.orc import java.io._ -import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} @@ -164,7 +163,7 @@ class OrcFileFormat (file: PartitionedFile) => { val conf = broadcastedConf.value.value - val filePath = new Path(new URI(file.filePath)) + val filePath = file.toPath val fs = filePath.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 80b6791d8fa..6b4651e3260 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.net.URI - import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Try} @@ -200,7 +198,7 @@ class ParquetFileFormat (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) - val filePath = new Path(new URI(file.filePath)) + val filePath = file.toPath val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) val sharedConf = broadcastedHadoopConf.value.value diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala index 782c1f50d80..7159bc6de3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala @@ -66,7 +66,8 @@ class FilePartitionReader[T]( } catch { case e: SchemaColumnConvertNotSupportedException => throw QueryExecutionErrors.unsupportedSchemaColumnConvertError( - currentReader.file.filePath, e.getColumn, e.getLogicalType, e.getPhysicalType, e) + currentReader.file.urlEncodedPath, + e.getColumn, e.getLogicalType, e.getPhysicalType, e) case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => logWarning( s"Skipped the rest of the content in the corrupted file: $currentReader", e) @@ -75,7 +76,8 @@ class FilePartitionReader[T]( case NonFatal(e) => e.getCause match { case sue: SparkUpgradeException => throw sue - case _ => throw QueryExecutionErrors.cannotReadFilesError(e, currentReader.file.filePath) + case _ => throw QueryExecutionErrors.cannotReadFilesError(e, + currentReader.file.urlEncodedPath) } } if (hasNext) { @@ -101,7 +103,7 @@ class FilePartitionReader[T]( logInfo(s"Reading file $reader") // Sets InputFileBlockHolder for the file block's information val file = reader.file - InputFileBlockHolder.set(file.filePath, file.start, file.length) + InputFileBlockHolder.set(file.urlEncodedPath, file.start, file.length) reader } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 9b6f9932866..0cfb55ab407 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -163,7 +163,7 @@ trait FileScan extends Scan } if (splitFiles.length == 1) { - val path = new Path(splitFiles(0).filePath) + val path = splitFiles(0).toPath if (!isSplitable(path) && splitFiles(0).length > sparkSession.sparkContext.getConf.get(IO_WARNING_LARGEFILETHRESHOLD)) { logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala index f8a17c8eaa8..37f6ae4aaa9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVPartitionReaderFactory.scala @@ -61,7 +61,7 @@ case class CSVPartitionReaderFactory( val schema = if (options.columnPruning) actualReadDataSchema else actualDataSchema val isStartOfFile = file.start == 0 val headerChecker = new CSVHeaderChecker( - schema, options, source = s"CSV file: ${file.filePath}", isStartOfFile) + schema, options, source = s"CSV file: ${file.urlEncodedPath}", isStartOfFile) val iter = CSVDataSource(options).readFile( conf, file, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index 4f93a67cc46..2b7bdae6b31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.execution.datasources.v2.orc -import java.net.URI - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} @@ -86,7 +84,7 @@ case class OrcPartitionReaderFactory( if (aggregation.nonEmpty) { return buildReaderWithAggregates(file, conf) } - val filePath = new Path(new URI(file.filePath)) + val filePath = file.toPath val orcSchema = Utils.tryWithResource(createORCReader(filePath, conf))(_.getSchema) val resultedColPruneInfo = OrcUtils.requestedColumnIds( @@ -127,7 +125,7 @@ case class OrcPartitionReaderFactory( if (aggregation.nonEmpty) { return buildColumnarReaderWithAggregates(file, conf) } - val filePath = new Path(new URI(file.filePath)) + val filePath = file.toPath val orcSchema = Utils.tryWithResource(createORCReader(filePath, conf))(_.getSchema) val resultedColPruneInfo = OrcUtils.requestedColumnIds( @@ -181,7 +179,7 @@ case class OrcPartitionReaderFactory( private def buildReaderWithAggregates( file: PartitionedFile, conf: Configuration): PartitionReader[InternalRow] = { - val filePath = new Path(new URI(file.filePath)) + val filePath = file.toPath new PartitionReader[InternalRow] { private var hasNext = true private lazy val row: InternalRow = { @@ -209,7 +207,7 @@ case class OrcPartitionReaderFactory( private def buildColumnarReaderWithAggregates( file: PartitionedFile, conf: Configuration): PartitionReader[ColumnarBatch] = { - val filePath = new Path(new URI(file.filePath)) + val filePath = file.toPath new PartitionReader[ColumnarBatch] { private var hasNext = true private lazy val batch: ColumnarBatch = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 121ebe1cfa2..5951c1d8dd9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -16,10 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.v2.parquet -import java.net.URI import java.time.ZoneId -import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.FileSplit import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -89,7 +87,7 @@ case class ParquetPartitionReaderFactory( private def getFooter(file: PartitionedFile): ParquetMetadata = { val conf = broadcastedConf.value.value - val filePath = new Path(new URI(file.filePath)) + val filePath = file.toPath if (aggregation.isEmpty) { ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS) @@ -132,7 +130,8 @@ case class ParquetPartitionReaderFactory( val footer = getFooter(file) if (footer != null && footer.getBlocks.size > 0) { - ParquetUtils.createAggInternalRowFromFooter(footer, file.filePath, dataSchema, + ParquetUtils.createAggInternalRowFromFooter(footer, file.urlEncodedPath, + dataSchema, partitionSchema, aggregation.get, readDataSchema, file.partitionValues, getDatetimeRebaseSpec(footer.getFileMetaData)) } else { @@ -175,7 +174,7 @@ case class ParquetPartitionReaderFactory( private val batch: ColumnarBatch = { val footer = getFooter(file) if (footer != null && footer.getBlocks.size > 0) { - val row = ParquetUtils.createAggInternalRowFromFooter(footer, file.filePath, + val row = ParquetUtils.createAggInternalRowFromFooter(footer, file.urlEncodedPath, dataSchema, partitionSchema, aggregation.get, readDataSchema, file.partitionValues, getDatetimeRebaseSpec(footer.getFileMetaData)) AggregatePushDownUtils.convertAggregatesRowToBatch( @@ -209,7 +208,7 @@ case class ParquetPartitionReaderFactory( RebaseSpec) => RecordReader[Void, T]): RecordReader[Void, T] = { val conf = broadcastedConf.value.value - val filePath = new Path(new URI(file.filePath)) + val filePath = file.toPath val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) lazy val footerFileMetaData = getFooter(file).getFileMetaData diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 2d70d95c685..94ba8b8aa51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql.execution.streaming -import java.net.URI - -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.FileStatus import org.json4s.NoTypeHints import org.json4s.jackson.Serialization +import org.apache.spark.paths.SparkPath import org.apache.spark.sql.SparkSession import org.apache.spark.sql.internal.SQLConf @@ -30,7 +29,7 @@ import org.apache.spark.sql.internal.SQLConf * The status of a file outputted by [[FileStreamSink]]. A file is visible only if it appears in * the sink log and its action is not "delete". * - * @param path the file path. + * @param path the file path as a uri-encoded string. * @param size the file size. * @param isDir whether this file is a directory. * @param modificationTime the file last modification time. @@ -46,17 +45,23 @@ case class SinkFileStatus( blockReplication: Int, blockSize: Long, action: String) { + def sparkPath: SparkPath = SparkPath.fromPathString(path) def toFileStatus: FileStatus = { new FileStatus( - size, isDir, blockReplication, blockSize, modificationTime, new Path(new URI(path))) + size, + isDir, + blockReplication, + blockSize, + modificationTime, + SparkPath.fromUrlString(path).toPath) } } object SinkFileStatus { def apply(f: FileStatus): SinkFileStatus = { SinkFileStatus( - path = f.getPath.toUri.toString, + path = SparkPath.fromPath(f.getPath).urlEncoded, size = f.getLen, isDir = f.isDirectory, modificationTime = f.getModificationTime, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 5baf3d29a49..6eb2ffef44e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.streaming -import java.net.URI import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit._ @@ -28,6 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.paths.SparkPath import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.read.streaming @@ -109,16 +109,16 @@ class FileStreamSource( // Visible for testing and debugging in production. val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly) - private var allFilesForTriggerAvailableNow: Seq[(String, Long)] = _ + private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _ metadataLog.restore().foreach { entry => - seenFiles.add(entry.path, entry.timestamp) + seenFiles.add(entry.sparkPath, entry.timestamp) } seenFiles.purge() logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs") - private var unreadFiles: Seq[(String, Long)] = _ + private var unreadFiles: Seq[(SparkPath, Long)] = _ /** * Returns the maximum offset that can be retrieved from the source. @@ -193,7 +193,7 @@ class FileStreamSource( metadataLogCurrentOffset += 1 val fileEntries = batchFiles.map { case (p, timestamp) => - FileEntry(path = p, timestamp = timestamp, batchId = metadataLogCurrentOffset) + FileEntry(path = p.urlEncoded, timestamp = timestamp, batchId = metadataLogCurrentOffset) }.toArray if (metadataLog.add(metadataLogCurrentOffset, fileEntries)) { logInfo(s"Log offset set to $metadataLogCurrentOffset with ${batchFiles.size} new files") @@ -239,7 +239,7 @@ class FileStreamSource( val newDataSource = DataSource( sparkSession, - paths = files.map(f => new Path(new URI(f.path)).toString), + paths = files.map(_.sparkPath.toPath.toString), userSpecifiedSchema = Some(schema), partitionColumns = partitionColumns, className = fileFormatClassName, @@ -286,7 +286,7 @@ class FileStreamSource( /** * Returns a list of files found, sorted by their timestamp. */ - private def fetchAllFiles(): Seq[(String, Long)] = { + private def fetchAllFiles(): Seq[(SparkPath, Long)] = { val startTime = System.nanoTime var allFiles: Seq[FileStatus] = null @@ -318,7 +318,7 @@ class FileStreamSource( } val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status => - (status.getPath.toUri.toString, status.getModificationTime) + (SparkPath.fromFileStatus(status), status.getModificationTime) } val endTime = System.nanoTime val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime) @@ -368,7 +368,12 @@ object FileStreamSource { val DISCARD_UNSEEN_FILES_RATIO = 0.2 val MAX_CACHED_UNSEEN_FILES = 10000 - case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable + case class FileEntry( + path: String, // uri-encoded path string + timestamp: Timestamp, + batchId: Long) extends Serializable { + def sparkPath: SparkPath = SparkPath.fromUrlString(path) + } /** * A custom hash map used to track the list of files seen. This map is not thread-safe. @@ -388,12 +393,12 @@ object FileStreamSource { /** Timestamp for the last purge operation. */ private var lastPurgeTimestamp: Timestamp = 0L - @inline private def stripPathIfNecessary(path: String) = { - if (fileNameOnly) new Path(new URI(path)).getName else path + @inline private def stripPathIfNecessary(path: SparkPath) = { + if (fileNameOnly) path.toPath.getName else path.urlEncoded } /** Add a new file to the map. */ - def add(path: String, timestamp: Timestamp): Unit = { + def add(path: SparkPath, timestamp: Timestamp): Unit = { map.put(stripPathIfNecessary(path), timestamp) if (timestamp > latestTimestamp) { latestTimestamp = timestamp @@ -404,7 +409,7 @@ object FileStreamSource { * Returns true if we should consider this file a new file. The file is only considered "new" * if it is new enough that we are still tracking, and we have not seen it before. */ - def isNewFile(path: String, timestamp: Timestamp): Boolean = { + def isNewFile(path: SparkPath, timestamp: Timestamp): Boolean = { // Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that // is older than (latestTimestamp - maxAgeMs) but has not been purged yet. timestamp >= lastPurgeTimestamp && !map.containsKey(stripPathIfNecessary(path)) @@ -551,7 +556,7 @@ object FileStreamSource { } override protected def cleanTask(entry: FileEntry): Unit = { - val curPath = new Path(new URI(entry.path)) + val curPath = entry.sparkPath.toPath val newPath = new Path(baseArchivePath.toString.stripSuffix("/") + curPath.toUri.getPath) try { @@ -575,7 +580,7 @@ object FileStreamSource { extends FileStreamSourceCleaner with Logging { override protected def cleanTask(entry: FileEntry): Unit = { - val curPath = new Path(new URI(entry.path)) + val curPath = entry.sparkPath.toPath try { logDebug(s"Removing completed file $curPath") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 3b81d215c7f..474de0dacae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -881,7 +881,8 @@ class FileBasedDataSourceSuite extends QueryTest assert(fileScan.get.dataFilters.nonEmpty) assert(fileScan.get.planInputPartitions().forall { partition => partition.asInstanceOf[FilePartition].files.forall { file => - file.filePath.contains("p1=1") && file.filePath.contains("p2=2") + file.urlEncodedPath.contains("p1=1") && + file.urlEncodedPath.contains("p2=2") } }) checkAnswer(df, Row("b", 1, 2)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 86a0c4d1799..2d7cd007bee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1461,7 +1461,7 @@ class SubquerySuite extends QueryTest partitionFilters.exists(ExecSubqueryExpression.hasSubquery) && fs.inputRDDs().forall( _.asInstanceOf[FileScanRDD].filePartitions.forall( - _.files.forall(_.filePath.contains("p=0")))) + _.files.forall(_.urlEncodedPath.contains("p=0")))) case _ => false }) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 4dbe619610e..26655c2d95a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem import org.apache.hadoop.mapreduce.Job import org.apache.spark.SparkException +import org.apache.spark.paths.SparkPath.{fromUrlString => sp} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.BucketSpec @@ -283,10 +284,10 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession { test("Locality support for FileScanRDD") { val partition = FilePartition(0, Array( - PartitionedFile(InternalRow.empty, "fakePath0", 0, 10, Array("host0", "host1")), - PartitionedFile(InternalRow.empty, "fakePath0", 10, 20, Array("host1", "host2")), - PartitionedFile(InternalRow.empty, "fakePath1", 0, 5, Array("host3")), - PartitionedFile(InternalRow.empty, "fakePath2", 0, 5, Array("host4")) + PartitionedFile(InternalRow.empty, sp("fakePath0"), 0, 10, Array("host0", "host1")), + PartitionedFile(InternalRow.empty, sp("fakePath0"), 10, 20, Array("host1", "host2")), + PartitionedFile(InternalRow.empty, sp("fakePath1"), 0, 5, Array("host3")), + PartitionedFile(InternalRow.empty, sp("fakePath2"), 0, 5, Array("host4")) )) val fakeRDD = new FileScanRDD( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala index 771ddbd6523..b6b89ab3043 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala @@ -23,6 +23,7 @@ import java.nio.file.Files import org.apache.hadoop.conf.Configuration +import org.apache.spark.paths.SparkPath import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.test.SharedSparkSession @@ -37,7 +38,11 @@ class HadoopFileLinesReaderSuite extends SharedSparkSession { Files.write(path.toPath, text.getBytes(StandardCharsets.UTF_8)) val lines = ranges.flatMap { case (start, length) => - val file = PartitionedFile(InternalRow.empty, path.getCanonicalPath, start, length) + val file = PartitionedFile( + InternalRow.empty, + SparkPath.fromPathString(path.getCanonicalPath), + start, + length) val hadoopConf = conf.getOrElse(spark.sessionState.newHadoopConf()) val reader = new HadoopFileLinesReader(file, delimOpt, hadoopConf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala index 9a374d5c302..1d2e467c94c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala @@ -278,7 +278,7 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession { options = Map.empty, hadoopConf = spark.sessionState.newHadoopConf()) val partitionedFile = mock(classOf[PartitionedFile]) - when(partitionedFile.filePath).thenReturn(fileStatus.getPath.toString) + when(partitionedFile.toPath).thenReturn(fileStatus.getPath) assert(reader(partitionedFile).nonEmpty === expected, s"Filters $filters applied to $fileStatus should be $expected.") } @@ -305,7 +305,7 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession { hadoopConf = spark.sessionState.newHadoopConf() ) val partitionedFile = mock(classOf[PartitionedFile]) - when(partitionedFile.filePath).thenReturn(file.getPath) + when(partitionedFile.toPath).thenReturn(new Path(file.toURI)) val encoder = RowEncoder(requiredSchema).resolveAndBind() encoder.createDeserializer().apply(reader(partitionedFile).next()) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 568b1df4c40..8c31d3c7abf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.JobContext import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.io.FileCommitProtocol +import org.apache.spark.paths.SparkPath import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.{AnalysisException, DataFrame} import org.apache.spark.sql.catalyst.util.stringToFile @@ -519,7 +520,7 @@ abstract class FileStreamSinkSuite extends StreamTest { .filter(_.toString.endsWith(".parquet")) .map(_.getFileName.toString) .toSet - val trackingFileNames = tracking.map(new Path(_).getName).toSet + val trackingFileNames = tracking.map(SparkPath.fromUrlString(_).toPath.getName).toSet // there would be possible to have race condition: // - some tasks complete while abortJob is being called @@ -569,7 +570,7 @@ abstract class FileStreamSinkSuite extends StreamTest { val allFiles = sinkLog.allFiles() // only files from non-empty partition should be logged assert(allFiles.length < 10) - assert(allFiles.forall(file => fs.exists(new Path(file.path)))) + assert(allFiles.forall(file => fs.exists(file.sparkPath.toPath))) // the query should be able to read all rows correctly with metadata log val outputDf = spark.read.format(format).load(outputDir.getCanonicalPath) @@ -709,14 +710,14 @@ class FileStreamSinkV1Suite extends FileStreamSinkSuite { // Read with pruning, should read only files in partition dir id=1 checkFileScanPartitions(df.filter("id = 1")) { partitions => val filesToBeRead = partitions.flatMap(_.files) - assert(filesToBeRead.map(_.filePath).forall(_.contains("/id=1/"))) + assert(filesToBeRead.map(_.urlEncodedPath).forall(_.contains("/id=1/"))) assert(filesToBeRead.map(_.partitionValues).distinct.size === 1) } // Read with pruning, should read only files in partition dir id=1 and id=2 checkFileScanPartitions(df.filter("id in (1,2)")) { partitions => val filesToBeRead = partitions.flatMap(_.files) - assert(!filesToBeRead.map(_.filePath).exists(_.contains("/id=3/"))) + assert(!filesToBeRead.map(_.urlEncodedPath).exists(_.contains("/id=3/"))) assert(filesToBeRead.map(_.partitionValues).distinct.size === 2) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 9b1e5a9e16e..a8a4df2ad04 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -34,6 +34,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.paths.SparkPath.{fromUrlString => sp} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.connector.read.streaming.ReadLimit @@ -1761,69 +1762,69 @@ class FileStreamSourceSuite extends FileStreamSourceTest { test("SeenFilesMap") { val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false) - map.add("a", 5) + map.add(sp("a"), 5) assert(map.size == 1) map.purge() assert(map.size == 1) // Add a new entry and purge should be no-op, since the gap is exactly 10 ms. - map.add("b", 15) + map.add(sp("b"), 15) assert(map.size == 2) map.purge() assert(map.size == 2) // Add a new entry that's more than 10 ms than the first entry. We should be able to purge now. - map.add("c", 16) + map.add(sp("c"), 16) assert(map.size == 3) map.purge() assert(map.size == 2) // Override existing entry shouldn't change the size - map.add("c", 25) + map.add(sp("c"), 25) assert(map.size == 2) // Not a new file because we have seen c before - assert(!map.isNewFile("c", 20)) + assert(!map.isNewFile(sp("c"), 20)) // Not a new file because timestamp is too old - assert(!map.isNewFile("d", 5)) + assert(!map.isNewFile(sp("d"), 5)) // Finally a new file: never seen and not too old - assert(map.isNewFile("e", 20)) + assert(map.isNewFile(sp("e"), 20)) } test("SeenFilesMap with fileNameOnly = true") { val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = true) - map.add("file:///a/b/c/d", 5) - map.add("file:///a/b/c/e", 5) + map.add(sp("file:///a/b/c/d"), 5) + map.add(sp("file:///a/b/c/e"), 5) assert(map.size === 2) - assert(!map.isNewFile("d", 5)) - assert(!map.isNewFile("file:///d", 5)) - assert(!map.isNewFile("file:///x/d", 5)) - assert(!map.isNewFile("file:///x/y/d", 5)) + assert(!map.isNewFile(sp("d"), 5)) + assert(!map.isNewFile(sp("file:///d"), 5)) + assert(!map.isNewFile(sp("file:///x/d"), 5)) + assert(!map.isNewFile(sp("file:///x/y/d"), 5)) - map.add("s3:///bucket/d", 5) - map.add("s3n:///bucket/d", 5) - map.add("s3a:///bucket/d", 5) + map.add(sp("s3:///bucket/d"), 5) + map.add(sp("s3n:///bucket/d"), 5) + map.add(sp("s3a:///bucket/d"), 5) assert(map.size === 2) } test("SeenFilesMap should only consider a file old if it is earlier than last purge time") { val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false) - map.add("a", 20) + map.add(sp("a"), 20) assert(map.size == 1) // Timestamp 5 should still considered a new file because purge time should be 0 - assert(map.isNewFile("b", 9)) - assert(map.isNewFile("b", 10)) + assert(map.isNewFile(sp("b"), 9)) + assert(map.isNewFile(sp("b"), 10)) // Once purge, purge time should be 10 and then b would be a old file if it is less than 10. map.purge() - assert(!map.isNewFile("b", 9)) - assert(map.isNewFile("b", 10)) + assert(!map.isNewFile(sp("b"), 9)) + assert(map.isNewFile(sp("b"), 10)) } test("do not recheck that files exist during getBatch") { @@ -2197,7 +2198,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry]) assert(files.forall(_.batchId == batchId)) - val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath } + val actualInputFiles = files.map { p => p.sparkPath.toUri.getPath } val expectedInputFiles = inputFiles.slice(batchId.toInt * 10, batchId.toInt * 10 + 10) .map(_.getCanonicalPath) assert(actualInputFiles === expectedInputFiles) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index aff014261ba..a9314397dcf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hive.orc -import java.net.URI import java.util.Properties import scala.collection.JavaConverters._ @@ -152,7 +151,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable (file: PartitionedFile) => { val conf = broadcastedHadoopConf.value.value - val filePath = new Path(new URI(file.filePath)) + val filePath = file.toPath // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file @@ -166,7 +165,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val orcRecordReader = { val job = Job.getInstance(conf) - FileInputFormat.setInputPaths(job, file.filePath) + FileInputFormat.setInputPaths(job, file.urlEncodedPath) // Custom OrcRecordReader is used to get // ObjectInspector during recordReader creation itself and can --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org