This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 50520fe  [SPARK-38314][SQL] Fix of failing to read parquet files after 
writing the hidden file metadata in
50520fe is described below

commit 50520fe3eb8237adefdb30938736536148110be1
Author: yaohua <yaohua.z...@databricks.com>
AuthorDate: Mon Feb 28 19:47:17 2022 +0800

    [SPARK-38314][SQL] Fix of failing to read parquet files after writing the 
hidden file metadata in
    
    ### What changes were proposed in this pull request?
    Selecting and then writing df containing hidden file metadata column 
`_metadata` into a file format like `parquet`, `delta` will still keep the 
internal `Attribute` metadata information. Then when reading those `parquet`, 
`delta` files again, it will actually break the code, because it wrongly thinks 
user data schema`_metadata` is a hidden file source metadata column.
    
    ```
    // prepare a file source df
    df.select("*", "_metadata").write.format("parquet").save(path)
    
    spark.read.format("parquet").load(path).select("*").show()
    ```
    This PR fixes this by cleaning up any remaining metadata information of 
output columns.
    
    ### Why are the changes needed?
    Bugfix
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    A new UT
    
    Closes #35650 from Yaohua628/spark-38314.
    
    Authored-by: yaohua <yaohua.z...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../catalyst/expressions/namedExpressions.scala    | 15 +++++++++
 .../execution/datasources/FileFormatWriter.scala   | 14 ++++++---
 .../datasources/FileMetadataStructSuite.scala      | 36 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 248584b..d5df6a1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -475,4 +475,19 @@ object FileSourceMetadataAttribute {
           && attr.metadata.getBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY) => 
Some(attr)
       case _ => None
     }
+
+  /**
+   * Cleanup the internal metadata information of an attribute if it is
+   * a [[FileSourceMetadataAttribute]], it will remove both 
[[METADATA_COL_ATTR_KEY]] and
+   * [[FILE_SOURCE_METADATA_COL_ATTR_KEY]] from the attribute [[Metadata]]
+   */
+  def cleanupFileSourceMetadataInformation(attr: Attribute): Attribute = attr 
match {
+    case FileSourceMetadataAttribute(attr) => attr.withMetadata(
+      new MetadataBuilder().withMetadata(attr.metadata)
+        .remove(METADATA_COL_ATTR_KEY)
+        .remove(FILE_SOURCE_METADATA_COL_ATTR_KEY)
+        .build()
+    )
+    case attr => attr
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 409e334..fe48ddc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -111,7 +111,11 @@ object FileFormatWriter extends Logging {
     FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
 
     val partitionSet = AttributeSet(partitionColumns)
-    val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains)
+    // cleanup the internal metadata information of
+    // the file source metadata attribute if any before write out
+    val finalOutputSpec = outputSpec.copy(outputColumns = 
outputSpec.outputColumns
+      .map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation))
+    val dataColumns = 
finalOutputSpec.outputColumns.filterNot(partitionSet.contains)
 
     var needConvert = false
     val projectList: Seq[NamedExpression] = plan.output.map {
@@ -167,12 +171,12 @@ object FileFormatWriter extends Logging {
       uuid = UUID.randomUUID.toString,
       serializableHadoopConf = new 
SerializableConfiguration(job.getConfiguration),
       outputWriterFactory = outputWriterFactory,
-      allColumns = outputSpec.outputColumns,
+      allColumns = finalOutputSpec.outputColumns,
       dataColumns = dataColumns,
       partitionColumns = partitionColumns,
       bucketSpec = writerBucketSpec,
-      path = outputSpec.outputPath,
-      customPartitionLocations = outputSpec.customPartitionLocations,
+      path = finalOutputSpec.outputPath,
+      customPartitionLocations = finalOutputSpec.customPartitionLocations,
       maxRecordsPerFile = 
caseInsensitiveOptions.get("maxRecordsPerFile").map(_.toLong)
         .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile),
       timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
@@ -212,7 +216,7 @@ object FileFormatWriter extends Logging {
         // the physical plan may have different attribute ids due to optimizer 
removing some
         // aliases. Here we bind the expression ahead to avoid potential 
attribute ids mismatch.
         val orderingExpr = bindReferences(
-          requiredOrdering.map(SortOrder(_, Ascending)), 
outputSpec.outputColumns)
+          requiredOrdering.map(SortOrder(_, Ascending)), 
finalOutputSpec.outputColumns)
         val sortPlan = SortExec(
           orderingExpr,
           global = false,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 0d391e0..175b420 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -474,4 +474,40 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
       Seq(Row("jack", 24, 12345L, f0(METADATA_FILE_SIZE)))
     )
   }
+
+  metadataColumnsTest("write _metadata in parquet and read back", schema) { 
(df, f0, f1) =>
+    // SPARK-38314: Selecting and then writing df containing hidden file
+    // metadata column `_metadata` into parquet files will still keep the 
internal `Attribute`
+    // metadata information of the column. It will then fail when read again.
+    withTempDir { dir =>
+      df.select("*", "_metadata")
+        .write.format("parquet").save(dir.getCanonicalPath + "/new-data")
+
+      val newDF = spark.read.format("parquet").load(dir.getCanonicalPath + 
"/new-data")
+
+      // SELECT * will have: name, age, info, _metadata of f0 and f1
+      checkAnswer(
+        newDF.select("*"),
+        Seq(
+          Row("jack", 24, Row(12345L, "uom"),
+            Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME),
+              f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))),
+          Row("lily", 31, Row(54321L, "ucb"),
+            Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME),
+              f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
+        )
+      )
+
+      // SELECT _metadata won't override the existing user data (_metadata of 
f0 and f1)
+      checkAnswer(
+        newDF.select("_metadata"),
+        Seq(
+          Row(Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME),
+            f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))),
+          Row(Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME),
+            f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
+        )
+      )
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to