cloud-fan commented on code in PR #55962:
URL: https://github.com/apache/spark/pull/55962#discussion_r3263200468


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -299,16 +299,33 @@ object FileFormat {
    * If an extractor is available, apply it. Otherwise, look up the column's 
name in the file's
    * column value map and return the result (or null, if not found).
    *
-   * Raw values (including null) are automatically converted to literals as a 
courtesy.
+   * Raw values (including null) are automatically converted to literals as a 
courtesy. When a
+   * `dataType` is supplied, the returned `Literal` carries that type and the 
value is converted
+   * via [[Literal.create]]. This lets the value be in catalyst form already 
(e.g. `ArrayData`
+   * for an `ArrayType<StructType>` constant metadata column), since 
[[Literal.apply]]'s
+   * value-class dispatch otherwise has no case for it.
    */
   def getFileConstantMetadataColumnValue(
       name: String,
       file: PartitionedFile,
-      metadataExtractors: Map[String, PartitionedFile => Any]): Literal = {
+      metadataExtractors: Map[String, PartitionedFile => Any],
+      dataType: Option[DataType] = None): Literal = {
     val extractor = metadataExtractors.getOrElse(name,
       { pf: PartitionedFile => 
pf.otherConstantMetadataColumnValues.get(name).orNull }
     )
-    Literal(extractor.apply(file))
+    val value = extractor.apply(file)
+    dataType match {
+      case Some(dt) =>
+        // Extractors are documented to return either raw values or 
`Literal`s. Unwrap

Review Comment:
   **Claim accuracy.** "Extractors are documented to return either raw values 
or `Literal`s" — this contract isn't actually documented anywhere in production 
code; `fileConstantMetadataExtractors`'s scaladoc (lines 224–237) doesn't 
mention `Literal` return values. The only mention is a test comment at 
`FileSourceCustomMetadataStructSuite.scala:101`. Either add this contract to 
`fileConstantMetadataExtractors`'s scaladoc, or rephrase to "Extractors may 
return either raw values or `Literal`s".



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -299,16 +299,33 @@ object FileFormat {
    * If an extractor is available, apply it. Otherwise, look up the column's 
name in the file's
    * column value map and return the result (or null, if not found).
    *
-   * Raw values (including null) are automatically converted to literals as a 
courtesy.
+   * Raw values (including null) are automatically converted to literals as a 
courtesy. When a
+   * `dataType` is supplied, the returned `Literal` carries that type and the 
value is converted
+   * via [[Literal.create]]. This lets the value be in catalyst form already 
(e.g. `ArrayData`
+   * for an `ArrayType<StructType>` constant metadata column), since 
[[Literal.apply]]'s
+   * value-class dispatch otherwise has no case for it.
    */
   def getFileConstantMetadataColumnValue(
       name: String,
       file: PartitionedFile,
-      metadataExtractors: Map[String, PartitionedFile => Any]): Literal = {
+      metadataExtractors: Map[String, PartitionedFile => Any],
+      dataType: Option[DataType] = None): Literal = {

Review Comment:
   **Design — columnar path inconsistency / optional dataType.**
   
   The columnar caller at `FileScanRDD.scala:187` 
(`createMetadataColumnVector`, unchanged) still calls 
`getFileConstantMetadataColumnValue` without `dataType`. After this PR, a 
custom `FileFormat` whose extractor returns a raw `ArrayData` works for the row 
path but throws `LITERAL_TYPE` on the columnar path. The two paths now have 
inconsistent extractor contracts for complex types.
   
   Every in-tree caller (`updateMetadataRow`, `createMetadataColumnVector`, 
`createMetadataInternalRow`) has the dataType available. Consider making 
`dataType` required (or at least threading it from `createMetadataColumnVector` 
too). That collapses the `Some(dt)` / `None` dual-mode logic and unifies the 
contract across both paths.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala:
##########
@@ -56,8 +56,24 @@ object SchemaPruning extends SQLConfHelper {
    * right. This function assumes that the fields of left are a subset of the 
fields of
    * right, recursively. That is, left is a "subschema" of right, ignoring 
order of
    * fields.
+   *
+   * `atMetadataAttributeTopStruct` is set when the recursion is currently 
iterating the
+   * top StructType of a file-source metadata attribute (e.g. `_metadata`'s
+   * `StructType([file_path, file_name, ...])`). Sibling sub-attributes there 
may still be
+   * pruned (each has its own extractor), but recursion into a kept 
sub-field's data type
+   * must preserve it verbatim. That data type is the catalyst value produced 
by a single
+   * extractor, and shaving fields out of it would shift positions in the row 
the
+   * extractor produced.
+   *
+   * `inExtractorOutput` is set whenever recursion has descended below a 
metadata
+   * sub-field's data type. In that mode, the right side is returned unchanged.
    */
-  private def sortLeftFieldsByRight(left: DataType, right: DataType): DataType 
=
+  private def sortLeftFieldsByRight(
+      left: DataType,
+      right: DataType,
+      atMetadataAttributeTopStruct: Boolean = false,
+      inExtractorOutput: Boolean = false): DataType = {

Review Comment:
   **Design question — layer coupling.** `sortLeftFieldsByRight` is a generic 
catalyst utility, also used for serializer pruning (`optimizer/objects.scala`) 
and V2 file source pruning (`v2/PushDownUtils.scala`). Adding 
`atMetadataAttributeTopStruct` / `inExtractorOutput` plus the 
`FileSourceMetadataAttribute.isValid` check threads file-source-specific 
semantics into a generic recursion.
   
   An alternative is to handle metadata-attribute preservation in the V1 file 
source rule at `sql/core/execution/datasources/SchemaPruning.scala:84–88` — 
manually filter the top-level sub-attributes of the metadata struct (each is 
whole) and skip recursion inside them. That keeps the generic utility unaware 
of file-source semantics. Was that approach considered?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -342,9 +359,11 @@ object FileFormat {
       row: InternalRow,
       fieldNames: Seq[String],
       file: PartitionedFile,
-      metadataExtractors: Map[String, PartitionedFile => Any]): InternalRow = {
+      metadataExtractors: Map[String, PartitionedFile => Any],
+      fieldDataTypes: Seq[DataType] = Nil): InternalRow = {
     fieldNames.zipWithIndex.foreach { case (name, i) =>
-      getFileConstantMetadataColumnValue(name, file, metadataExtractors) match 
{
+      val dt = if (fieldDataTypes.isEmpty) None else Some(fieldDataTypes(i))

Review Comment:
   **Defensive check.** If a caller passes a non-empty `fieldDataTypes` shorter 
than `fieldNames`, `fieldDataTypes(i)` throws `IndexOutOfBoundsException`. 
Suggest adding `require(fieldDataTypes.isEmpty || fieldDataTypes.length == 
fieldNames.length)` at the top of the method, or use `fieldDataTypes.lift(i)`.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruningSuite.scala:
##########
@@ -143,4 +143,42 @@ class SchemaPruningSuite extends SparkFunSuite with 
SQLHelper {
     val prunedSchema = SchemaPruning.pruneSchema(schema, rootFields)
     assert(prunedSchema.head.metadata.getString("foo") == "bar")
   }
+
+  test("file-source metadata attribute: sub-field complex types are preserved 
across pruning") {
+    // Subfields of a file-source metadata attribute are populated by 
per-field extractors
+    // that produce complete catalyst values. Pruning inside such a subfield's 
data type
+    // would shift the row positions away from what the extractor produced. 
Pruning sibling
+    // sub-attributes (each with its own extractor) is still allowed.
+    val permissionStruct = StructType(Seq(
+      StructField("id", StringType),
+      StructField("role", StringType),
+      StructField("email", StringType)))
+    val innerMetadataSchema = StructType(Seq(
+      StructField("file_path", StringType, nullable = true),
+      StructField("file_size", LongType, nullable = true),
+      StructField("permissions", ArrayType(permissionStruct, containsNull = 
true),
+        nullable = true)))
+    val schema = StructType(Seq(
+      StructField("_metadata", innerMetadataSchema, nullable = false,
+        metadata = FileSourceMetadataAttribute.metadata("_metadata"))))
+
+    // Query touches `_metadata.permissions.email` and `_metadata.file_path`.
+    val requested = Seq(
+      SchemaPruning.RootField(
+        field = StructField("_metadata", StructType(Seq(
+          StructField("file_path", StringType),
+          StructField("permissions",
+            ArrayType(StructType(Seq(StructField("email", StringType))), 
containsNull = true))))),
+        derivedFromAtt = false))
+
+    val pruned = SchemaPruning.pruneSchema(schema, requested)
+    val prunedMetadata = pruned("_metadata").dataType.asInstanceOf[StructType]
+    // Sibling primitive sub-attributes are pruned (file_size dropped).
+    assert(prunedMetadata.fieldNames.toSeq === Seq("file_path", "permissions"))
+    // The kept complex sub-attribute keeps its full element struct (id, role, 
email)
+    // even though the query only references `email`.
+    val element = 
prunedMetadata("permissions").dataType.asInstanceOf[ArrayType]
+      .elementType.asInstanceOf[StructType]
+    assert(element.fieldNames.toSeq === Seq("id", "role", "email"))
+  }

Review Comment:
   **Test breadth.** This case covers `ArrayType<StructType>` but not a 
`StructType`-typed sub-attribute. A `_metadata.location: StructType(country, 
city)` queried as `_metadata.location.country` would exercise the same 
preservation logic in a different shape — worth extending this test or adding a 
sibling case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to