This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 223afea9960c [SPARK-46473][SQL] Reuse `getPartitionedFile` method 223afea9960c is described below commit 223afea9960c7ef1a4c8654e043e860f6c248185 Author: huangxiaoping <1754789...@qq.com> AuthorDate: Wed Jan 31 22:59:20 2024 -0600 [SPARK-46473][SQL] Reuse `getPartitionedFile` method ### What changes were proposed in this pull request? Reuse `getPartitionedFile` method to reduce redundant code. ### Why are the changes needed? Reduce redundant code. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? No Closes #44437 from huangxiaopingRD/SPARK-46473. Authored-by: huangxiaoping <1754789...@qq.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../apache/spark/sql/execution/DataSourceScanExec.scala | 2 +- .../apache/spark/sql/execution/PartitionedFileUtil.scala | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index b3b2b0eab055..2622eadaefb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -645,7 +645,7 @@ case class FileSourceScanExec( logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = selectedPartitions.flatMap { p => - p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values)) + p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values, 0, f.getLen)) }.groupBy { f => BucketingUtils .getBucketId(f.toPath.getName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala index b31369b6768e..997859058de1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala @@ -33,20 +33,20 @@ object PartitionedFileUtil { (0L until file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining - val hosts = getBlockHosts(getBlockLocations(file.fileStatus), offset, size) - PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), offset, size, hosts, - file.getModificationTime, file.getLen, file.metadata) + getPartitionedFile(file, partitionValues, offset, size) } } else { - Seq(getPartitionedFile(file, partitionValues)) + Seq(getPartitionedFile(file, partitionValues, 0, file.getLen)) } } def getPartitionedFile( file: FileStatusWithMetadata, - partitionValues: InternalRow): PartitionedFile = { - val hosts = getBlockHosts(getBlockLocations(file.fileStatus), 0, file.getLen) - PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), 0, file.getLen, hosts, + partitionValues: InternalRow, + start: Long, + length: Long): PartitionedFile = { + val hosts = getBlockHosts(getBlockLocations(file.fileStatus), start, length) + PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), start, length, hosts, file.getModificationTime, file.getLen, file.metadata) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org