This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new aeafb175875 [SPARK-39806][SQL] Accessing `_metadata` on partitioned table can crash a query aeafb175875 is described below commit aeafb175875c00519e03e0ea5b5f22f765dc3607 Author: Ala Luszczak <a...@databricks.com> AuthorDate: Tue Jul 19 09:04:03 2022 +0800 [SPARK-39806][SQL] Accessing `_metadata` on partitioned table can crash a query This changes alters the projection used in `FileScanRDD` to attach file metadata to a row produced by the reader. This projection used to remove the partitioning columns from the produced row. The produced row had different schema than expected by the consumers, and was missing part of the data, which resulted in query failure. This is a bug. `FileScanRDD` should produce rows matching expected schema, and containing all the requested data. Queries should not crash due to internal errors. No. Adds a new test in `FileMetadataStructSuite.scala` that reproduces the issue. Closes #37214 from ala/metadata-partition-by. Authored-by: Ala Luszczak <a...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 385f1c8e4037928afafbf6664e30dc268510c05e) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/execution/DataSourceScanExec.scala | 4 ++-- .../sql/execution/datasources/FileScanRDD.scala | 4 ++-- .../datasources/FileMetadataStructSuite.scala | 26 ++++++++++++++++++++++ 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 9e8ae9a714d..40d29af28f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -621,7 +621,7 @@ case class FileSourceScanExec( } new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions, - requiredSchema, metadataColumns) + new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), metadataColumns) } /** @@ -678,7 +678,7 @@ case class FileSourceScanExec( FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) new FileScanRDD(fsRelation.sparkSession, readFile, partitions, - requiredSchema, metadataColumns) + new StructType(requiredSchema.fields ++ fsRelation.partitionSchema.fields), metadataColumns) } // Filters unused DynamicPruningExpression expressions - one which has been replaced diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 20c393a5c0e..b65b36ef393 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -68,7 +68,7 @@ class FileScanRDD( @transient private val sparkSession: SparkSession, readFunction: (PartitionedFile) => Iterator[InternalRow], @transient val filePartitions: Seq[FilePartition], - val readDataSchema: StructType, + val readSchema: StructType, val metadataColumns: Seq[AttributeReference] = Seq.empty) extends RDD[InternalRow](sparkSession.sparkContext, Nil) { @@ -126,7 +126,7 @@ class FileScanRDD( // an unsafe projection to convert a joined internal row to an unsafe row private lazy val projection = { val joinedExpressions = - readDataSchema.fields.map(_.dataType) ++ metadataColumns.map(_.dataType) + readSchema.fields.map(_.dataType) ++ metadataColumns.map(_.dataType) UnsafeProjection.create(joinedExpressions) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala index 410fc985dd3..6afea42ee83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala @@ -21,6 +21,7 @@ import java.io.File import java.sql.Timestamp import java.text.SimpleDateFormat +import org.apache.spark.TestUtils import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.functions._ @@ -30,6 +31,8 @@ import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructFiel class FileMetadataStructSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + val data0: Seq[Row] = Seq(Row("jack", 24, Row(12345L, "uom"))) val data1: Seq[Row] = Seq(Row("lily", 31, Row(54321L, "ucb"))) @@ -564,4 +567,27 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { ) } } + + Seq(true, false).foreach { useVectorizedReader => + val label = if (useVectorizedReader) "reading batches" else "reading rows" + test(s"SPARK-39806: metadata for a partitioned table ($label)") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> useVectorizedReader.toString) { + withTempPath { dir => + // Store dynamically partitioned data. + Seq(1 -> 1).toDF("a", "b").write.format("parquet").partitionBy("b") + .save(dir.getAbsolutePath) + + // Identify the data file and its metadata. + val file = TestUtils.recursiveList(dir) + .filter(_.getName.endsWith(".parquet")).head + val expectedDf = Seq(1 -> 1).toDF("a", "b") + .withColumn(FileFormat.FILE_NAME, lit(file.getName)) + .withColumn(FileFormat.FILE_SIZE, lit(file.length())) + + checkAnswer(spark.read.parquet(dir.getAbsolutePath) + .select("*", METADATA_FILE_NAME, METADATA_FILE_SIZE), expectedDf) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org