jaceklaskowski commented on code in PR #40545:
URL: https://github.com/apache/spark/pull/40545#discussion_r1148384721


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -198,6 +198,10 @@ object FileFormat {
   // until they can be placed in the _metadata struct.
   val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
 
+  // The field readers can use to access the generated row index column.

Review Comment:
   I don't think the description says anything useful in addition to what's 
implied anyway from `ROW_INDEX_FIELD` being a public val (_constant_) that 
others (e.g., "readers") could use. Please remove the comment.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,37 +247,41 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
       // For generated metadata columns, they are set as nullable when passed 
to readers,
       //  as the values will be null when trying to read the missing column 
from the file.
       //  They are then replaced by the actual values later in the process.
-      // All metadata columns will be non-null in the returned output.
-      // We then change the nullability to non-nullable in the metadata 
projection node below.
-      val constantMetadataColumns: mutable.Buffer[Attribute] = 
mutable.Buffer.empty
-      val generatedMetadataColumns: mutable.Buffer[Attribute] = 
mutable.Buffer.empty
+      // We then restore the specified nullability in the metadata projection 
node below.
+      // Also remember the attribute for each column name, so we can easily 
map back to it.
+      val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+      val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+      val metadataColumnsByName = mutable.Map.empty[String, Attribute]
 
       metadataStructOpt.foreach { metadataStruct =>
-        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach { 
field =>
-          field.name match {
-            case FileFormat.ROW_INDEX =>
-              if ((readDataColumns ++ 
partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
-                  .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
-                throw new 
AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
-                  " is a reserved column name that cannot be read in 
combination with " +
-                  s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} 
column.")
-              }
-              generatedMetadataColumns +=
-                FileSourceGeneratedMetadataStructField(
-                  name = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME,
-                  internalName = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME,
-                  dataType = LongType,
-                  nullable = true).toAttribute
-            case _ =>
-              constantMetadataColumns +=
-                FileSourceConstantMetadataStructField(field.name, 
field.dataType).toAttribute
-          }
+        val schemaColumns = (readDataColumns ++ partitionColumns)
+          .map(_.name.toLowerCase(Locale.ROOT))
+          .toSet
+
+        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach {

Review Comment:
   Can we introduce a (even internal) method with a name that says what this 
`foreach` does? Or even a helper function that `foreach` uses for the check. 
It's going to make comprehension so much easier 🙏 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -519,6 +519,13 @@ object FileSourceMetadataAttribute {
   def cleanupFileSourceMetadataInformation(attr: Attribute): Attribute =
     attr.withMetadata(removeInternalMetadata(attr.metadata))
 
+  /**
+   * Cleanup the internal metadata information of a struct field, if it is

Review Comment:
   nit: Just "Removes the internal field metadata" would be enough. It's not 
clear even from the implementation itself that it's important that the 
`StructField` be of the types mentioned.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -221,14 +225,23 @@ object FileFormat {
     FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, 
TimestampType, nullable = false))
 
   /**
-   * Create a file metadata struct column containing fields supported by the 
given file format.
+   * All fields the file format's _metadata struct defines.

Review Comment:
   Replace with "Supported metadata fields of the given [[FileFormat]]"



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -221,14 +225,23 @@ object FileFormat {
     FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, 
TimestampType, nullable = false))
 
   /**
-   * Create a file metadata struct column containing fields supported by the 
given file format.
+   * All fields the file format's _metadata struct defines.
    */
-  def createFileMetadataCol(fileFormat: FileFormat): AttributeReference = {
-    val fields = if (fileFormat.isInstanceOf[ParquetFileFormat]) {
-      BASE_METADATA_FIELDS :+ StructField(FileFormat.ROW_INDEX, LongType, 
nullable = false)
+  def metadataSchemaFields(fileFormat: FileFormat): Seq[StructField] =
+    if (fileFormat.isInstanceOf[ParquetFileFormat]) {
+      BASE_METADATA_FIELDS :+ ROW_INDEX_FIELD
     } else {
       BASE_METADATA_FIELDS
     }
+
+  /**
+   * Create a file metadata struct column containing fields supported by the 
given file format.

Review Comment:
   nit: Creates...[[FileFormat]]



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -322,18 +329,13 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
 
       // extra Project node: wrap flat metadata columns to a metadata struct
       val withMetadataProjections = metadataStructOpt.map { metadataStruct =>
-        val structColumns = metadataColumns.map { col => col.name match {
-            case FileFormat.FILE_PATH | FileFormat.FILE_NAME | 
FileFormat.FILE_SIZE |
-                 FileFormat.FILE_BLOCK_START | FileFormat.FILE_BLOCK_LENGTH |
-                 FileFormat.FILE_MODIFICATION_TIME =>
-              col
-            case FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME =>
-              generatedMetadataColumns
-                .find(_.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
-                // Change the `_tmp_metadata_row_index` to `row_index`,
-                // and also change the nullability to not nullable,
-                // which is consistent with the nullability of `row_index` 
field
-                .get.withName(FileFormat.ROW_INDEX).withNullability(false)
+        val structColumns = 
metadataStruct.dataType.asInstanceOf[StructType].fields.map { field =>
+          // Construct the metadata struct the query expects to see, using the 
columns we previously
+          // created. Be sure to restore the proper name and nullability for 
each metadata field.
+          metadataColumnsByName(field.name) match {

Review Comment:
   Can we introduce a method `restoreProperNameAndNullability(field: 
StructField)` for this comment and the code that follows? We could then remove 
the comment altogether since the method name would say it all. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to