This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 385f1c8e403 [SPARK-39806][SQL] Accessing `_metadata` on partitioned table can crash a query 385f1c8e403 is described below commit 385f1c8e4037928afafbf6664e30dc268510c05e Author: Ala Luszczak <a...@databricks.com> AuthorDate: Tue Jul 19 09:04:03 2022 +0800 [SPARK-39806][SQL] Accessing `_metadata` on partitioned table can crash a query ### What changes were proposed in this pull request? This changes alters the projection used in `FileScanRDD` to attach file metadata to a row produced by the reader. This projection used to remove the partitioning columns from the produced row. The produced row had different schema than expected by the consumers, and was missing part of the data, which resulted in query failure. ### Why are the changes needed? This is a bug. `FileScanRDD` should produce rows matching expected schema, and containing all the requested data. Queries should not crash due to internal errors. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Adds a new test in `FileMetadataStructSuite.scala` that reproduces the issue. Closes #37214 from ala/metadata-partition-by. Authored-by: Ala Luszczak <a...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/execution/DataSourceScanExec.scala | 7 ++++-- .../sql/execution/datasources/FileScanRDD.scala | 4 ++-- .../datasources/FileMetadataStructSuite.scala | 26 ++++++++++++++++++++++ 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 9e316cc88cf..5950136e79a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -650,7 +650,9 @@ case class FileSourceScanExec( } new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions, - requiredSchema, metadataColumns, new FileSourceOptions(CaseInsensitiveMap(relation.options))) + new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), metadataColumns, + new FileSourceOptions(CaseInsensitiveMap(relation.options))) + } /** @@ -707,7 +709,8 @@ case class FileSourceScanExec( FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) new FileScanRDD(fsRelation.sparkSession, readFile, partitions, - requiredSchema, metadataColumns, new FileSourceOptions(CaseInsensitiveMap(relation.options))) + new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), metadataColumns, + new FileSourceOptions(CaseInsensitiveMap(relation.options))) } // Filters unused DynamicPruningExpression expressions - one which has been replaced diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 97776413509..4c3f5629e78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -69,7 +69,7 @@ class FileScanRDD( @transient private val sparkSession: SparkSession, readFunction: (PartitionedFile) => Iterator[InternalRow], @transient val filePartitions: Seq[FilePartition], - val readDataSchema: StructType, + val readSchema: StructType, val metadataColumns: Seq[AttributeReference] = Seq.empty, options: FileSourceOptions = new FileSourceOptions(CaseInsensitiveMap(Map.empty))) extends RDD[InternalRow](sparkSession.sparkContext, Nil) { @@ -128,7 +128,7 @@ class FileScanRDD( // an unsafe projection to convert a joined internal row to an unsafe row private lazy val projection = { val joinedExpressions = - readDataSchema.fields.map(_.dataType) ++ metadataColumns.map(_.dataType) + readSchema.fields.map(_.dataType) ++ metadataColumns.map(_.dataType) UnsafeProjection.create(joinedExpressions) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index 410fc985dd3..6afea42ee83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -21,6 +21,7 @@ import java.io.File import java.sql.Timestamp import java.text.SimpleDateFormat +import org.apache.spark.TestUtils import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.functions._ @@ -30,6 +31,8 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructFiel class FileMetadataStructSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + val data0: Seq[Row] = Seq(Row("jack", 24, Row(12345L, "uom"))) val data1: Seq[Row] = Seq(Row("lily", 31, Row(54321L, "ucb"))) @@ -564,4 +567,27 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { ) } } + + Seq(true, false).foreach { useVectorizedReader => + val label = if (useVectorizedReader) "reading batches" else "reading rows" + test(s"SPARK-39806: metadata for a partitioned table ($label)") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> useVectorizedReader.toString) { + withTempPath { dir => + // Store dynamically partitioned data. + Seq(1 -> 1).toDF("a", "b").write.format("parquet").partitionBy("b") + .save(dir.getAbsolutePath) + + // Identify the data file and its metadata. + val file = TestUtils.recursiveList(dir) + .filter(_.getName.endsWith(".parquet")).head + val expectedDf = Seq(1 -> 1).toDF("a", "b") + .withColumn(FileFormat.FILE_NAME, lit(file.getName)) + .withColumn(FileFormat.FILE_SIZE, lit(file.length())) + + checkAnswer(spark.read.parquet(dir.getAbsolutePath) + .select("*", METADATA_FILE_NAME, METADATA_FILE_SIZE), expectedDf) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org