johanl-db commented on code in PR #40545:
URL: https://github.com/apache/spark/pull/40545#discussion_r1149566989


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -519,6 +519,13 @@ object FileSourceMetadataAttribute {
   def cleanupFileSourceMetadataInformation(attr: Attribute): Attribute =
     attr.withMetadata(removeInternalMetadata(attr.metadata))
 
+  /**
+   * Cleanup the internal metadata information of a struct field, if it is

Review Comment:
   Done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -221,14 +225,23 @@ object FileFormat {
     FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, 
TimestampType, nullable = false))
 
   /**
-   * Create a file metadata struct column containing fields supported by the 
given file format.
+   * All fields the file format's _metadata struct defines.

Review Comment:
   Done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -322,18 +329,13 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
 
       // extra Project node: wrap flat metadata columns to a metadata struct
       val withMetadataProjections = metadataStructOpt.map { metadataStruct =>
-        val structColumns = metadataColumns.map { col => col.name match {
-            case FileFormat.FILE_PATH | FileFormat.FILE_NAME | 
FileFormat.FILE_SIZE |
-                 FileFormat.FILE_BLOCK_START | FileFormat.FILE_BLOCK_LENGTH |
-                 FileFormat.FILE_MODIFICATION_TIME =>
-              col
-            case FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME =>
-              generatedMetadataColumns
-                .find(_.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
-                // Change the `_tmp_metadata_row_index` to `row_index`,
-                // and also change the nullability to not nullable,
-                // which is consistent with the nullability of `row_index` 
field
-                .get.withName(FileFormat.ROW_INDEX).withNullability(false)
+        val structColumns = 
metadataStruct.dataType.asInstanceOf[StructType].fields.map { field =>
+          // Construct the metadata struct the query expects to see, using the 
columns we previously
+          // created. Be sure to restore the proper name and nullability for 
each metadata field.
+          metadataColumnsByName(field.name) match {

Review Comment:
   I simplified this code to always restore the name and nullability, I don't 
think it warrants a method anymore.
   I actually think it was not correct before that since you could have a 
generated column created with `nullable = true` on 
[L272](https://github.com/apache/spark/pull/40545/commits/2f5f9a26ec5cdf0c04893a18e2cfa386228e7aa2#diff-7e4f78e90e8699733afbe43e2b265b95e514896ac68f1fb9e60705d59a0b7ed9R272)
 and nullability wouldn't be reset if its `name` and `internalName` are the same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,37 +247,41 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
       // For generated metadata columns, they are set as nullable when passed 
to readers,
       //  as the values will be null when trying to read the missing column 
from the file.
       //  They are then replaced by the actual values later in the process.
-      // All metadata columns will be non-null in the returned output.
-      // We then change the nullability to non-nullable in the metadata 
projection node below.
-      val constantMetadataColumns: mutable.Buffer[Attribute] = 
mutable.Buffer.empty
-      val generatedMetadataColumns: mutable.Buffer[Attribute] = 
mutable.Buffer.empty
+      // We then restore the specified nullability in the metadata projection 
node below.
+      // Also remember the attribute for each column name, so we can easily 
map back to it.
+      val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+      val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+      val metadataColumnsByName = mutable.Map.empty[String, Attribute]
 
       metadataStructOpt.foreach { metadataStruct =>
-        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach { 
field =>
-          field.name match {
-            case FileFormat.ROW_INDEX =>
-              if ((readDataColumns ++ 
partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
-                  .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
-                throw new 
AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
-                  " is a reserved column name that cannot be read in 
combination with " +
-                  s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} 
column.")
-              }
-              generatedMetadataColumns +=
-                FileSourceGeneratedMetadataStructField(
-                  name = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME,
-                  internalName = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME,
-                  dataType = LongType,
-                  nullable = true).toAttribute
-            case _ =>
-              constantMetadataColumns +=
-                FileSourceConstantMetadataStructField(field.name, 
field.dataType).toAttribute
-          }
+        val schemaColumns = (readDataColumns ++ partitionColumns)
+          .map(_.name.toLowerCase(Locale.ROOT))
+          .toSet
+
+        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach {

Review Comment:
   Added method `createMetadataColumn`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -221,14 +225,23 @@ object FileFormat {
     FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, 
TimestampType, nullable = false))
 
   /**
-   * Create a file metadata struct column containing fields supported by the 
given file format.
+   * All fields the file format's _metadata struct defines.
    */
-  def createFileMetadataCol(fileFormat: FileFormat): AttributeReference = {
-    val fields = if (fileFormat.isInstanceOf[ParquetFileFormat]) {
-      BASE_METADATA_FIELDS :+ StructField(FileFormat.ROW_INDEX, LongType, 
nullable = false)
+  def metadataSchemaFields(fileFormat: FileFormat): Seq[StructField] =
+    if (fileFormat.isInstanceOf[ParquetFileFormat]) {
+      BASE_METADATA_FIELDS :+ ROW_INDEX_FIELD
     } else {
       BASE_METADATA_FIELDS
     }
+
+  /**
+   * Create a file metadata struct column containing fields supported by the 
given file format.

Review Comment:
   Done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -198,6 +198,10 @@ object FileFormat {
   // until they can be placed in the _metadata struct.
   val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
 
+  // The field readers can use to access the generated row index column.

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to