This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new fb2bdeaa234 [SPARK-40918][SQL][3.3] Mismatch between 
FileSourceScanExec and Orc and ParquetFileFormat on producing columnar output
fb2bdeaa234 is described below

commit fb2bdeaa234b76fdd1e2b6101f714891daa89af9
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Mon Oct 31 14:00:09 2022 +0800

    [SPARK-40918][SQL][3.3] Mismatch between FileSourceScanExec and Orc and 
ParquetFileFormat on producing columnar output
    
    ### What changes were proposed in this pull request?
    
    We move the decision about supporting columnar output based on WSCG one 
level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass 
it as a new required option for ParquetFileFormat / OrcFileFormat. Now the 
semantics is as follows:
    * `ParquetFileFormat.supportsBatch` and `OrcFileFormat.supportsBatch` 
returns whether it **can**, not necessarily **will** return columnar output.
    * To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH` 
needs to be passed to `buildReaderWithPartitionValues` in these two file 
formats. It should only be set to `true` if `supportsBatch` is also `true`, but 
it can be set to `false` if we don't want columnar output nevertheless - this 
way, `FileSourceScanExec` can set it to false when there are more than 100 
columsn for WSCG, and `ParquetFileFormat` / `OrcFileFormat` doesn't have to 
concern itself about WSCG limits.
    * To avoid not passing it by accident, this option is made required. Making 
it required requires updating a few places that use it, but an error resulting 
from this is very obscure. It's better to fail early and explicitly here.
    
    ### Why are the changes needed?
    
    This explains it for `ParquetFileFormat`. `OrcFileFormat` had exactly the 
same issue.
    
    `java.lang.ClassCastException: 
org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to 
org.apache.spark.sql.catalyst.InternalRow` was being thrown because 
ParquetReader was outputting columnar batches, while FileSourceScanExec 
expected row output.
    
    The mismatch comes from the fact that `ParquetFileFormat.supportBatch` 
depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the 
threshold is 100 fields.
    
    When this is used in `FileSourceScanExec`:
    ```
      override lazy val supportsColumnar: Boolean = {
          relation.fileFormat.supportBatch(relation.sparkSession, schema)
      }
    ```
    the `schema` comes from output attributes, which includes extra metadata 
attributes.
    
    However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was 
calculated again as
    ```
          relation.fileFormat.buildReaderWithPartitionValues(
            sparkSession = relation.sparkSession,
            dataSchema = relation.dataSchema,
            partitionSchema = relation.partitionSchema,
            requiredSchema = requiredSchema,
            filters = pushedDownFilters,
            options = options,
            hadoopConf = hadoopConf
    ...
    val resultSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
    ...
    val returningBatch = supportBatch(sparkSession, resultSchema)
    ```
    
    Where `requiredSchema` and `partitionSchema` wouldn't include the metadata 
columns:
    ```
    FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, 
file_path#6388)
    FileSourceScanExec: dataSchema: 
StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
    FileSourceScanExec: partitionSchema: StructType()
    FileSourceScanExec: requiredSchema: 
StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true))
    ```
    
    Column like `file_path#6388` are added by the scan, and contain metadata 
added by the scan, not by the file reader which concerns itself with what is 
within the file.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Not a public API change, but it is now required to pass 
`FileFormat.OPTION_RETURNING_BATCH` in `options` to 
`ParquetFileFormat.buildReaderWithPartitionValues`. The only user of this API 
in Apache Spark is `FileSourceScanExec`.
    
    ### How was this patch tested?
    
    Tests added
    
    Backports #38397 from juliuszsompolski/SPARK-40918.
    
    Authored-by: Juliusz Sompolski <julekdatabricks.com>
    Signed-off-by: Wenchen Fan <wenchendatabricks.com>
    
    Closes #38431 from juliuszsompolski/SPARK-40918-3.3.
    
    Authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 11 +++++--
 .../sql/execution/datasources/FileFormat.scala     | 13 ++++++++
 .../execution/datasources/orc/OrcFileFormat.scala  | 33 +++++++++++++++++---
 .../datasources/parquet/ParquetFileFormat.scala    | 36 ++++++++++++++++++----
 .../datasources/FileMetadataStructSuite.scala      | 26 ++++++++++++++++
 5 files changed, 107 insertions(+), 12 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 40d29af28f9..47103f6698f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -211,7 +211,12 @@ case class FileSourceScanExec(
   // Note that some vals referring the file-based relation are lazy 
intentionally
   // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
   override lazy val supportsColumnar: Boolean = {
-    relation.fileFormat.supportBatch(relation.sparkSession, schema)
+    val conf = relation.sparkSession.sessionState.conf
+    // Only output columnar if there is WSCG to read it.
+    val requiredWholeStageCodegenSettings =
+      conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf, 
schema)
+    requiredWholeStageCodegenSettings &&
+      relation.fileFormat.supportBatch(relation.sparkSession, schema)
   }
 
   private lazy val needsUnsafeRowConversion: Boolean = {
@@ -447,6 +452,8 @@ case class FileSourceScanExec(
   }
 
   lazy val inputRDD: RDD[InternalRow] = {
+    val options = relation.options +
+      (FileFormat.OPTION_RETURNING_BATCH -> supportsColumnar.toString)
     val readFile: (PartitionedFile) => Iterator[InternalRow] =
       relation.fileFormat.buildReaderWithPartitionValues(
         sparkSession = relation.sparkSession,
@@ -454,7 +461,7 @@ case class FileSourceScanExec(
         partitionSchema = relation.partitionSchema,
         requiredSchema = requiredSchema,
         filters = pushedDownFilters,
-        options = relation.options,
+        options = options,
         hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
 
     val readRDD = if (bucketedScan) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index f9b37fb5d9f..0263de8525f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -60,6 +60,11 @@ trait FileFormat {
 
   /**
    * Returns whether this format supports returning columnar batch or not.
+   * If columnar batch output is requested, users shall supply
+   * FileFormat.OPTION_RETURNING_BATCH -> true
+   * in relation options when calling buildReaderWithPartitionValues.
+   * This should only be passed as true if it can actually be supported.
+   * For ParquetFileFormat and OrcFileFormat, passing this option is required.
    *
    * TODO: we should just have different traits for the different formats.
    */
@@ -184,6 +189,14 @@ object FileFormat {
 
   val METADATA_NAME = "_metadata"
 
+  /**
+   * Option to pass to buildReaderWithPartitionValues to return columnar batch 
output or not.
+   * For ParquetFileFormat and OrcFileFormat, passing this option is required.
+   * This should only be passed as true if it can actually be supported, which 
can be checked
+   * by calling supportBatch.
+   */
+  val OPTION_RETURNING_BATCH = "returning_batch"
+
   // supported metadata struct fields for hadoop fs relation
   val METADATA_STRUCT: StructType = new StructType()
     .add(StructField(FILE_PATH, StringType))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 2b060c90153..34517ed60de 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -36,7 +36,6 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
-import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -100,8 +99,7 @@ class OrcFileFormat
 
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
     val conf = sparkSession.sessionState.conf
-    conf.orcVectorizedReaderEnabled && conf.wholeStageEnabled &&
-      !WholeStageCodegenExec.isTooManyFields(conf, schema) &&
+    conf.orcVectorizedReaderEnabled &&
       schema.forall(s => OrcUtils.supportColumnarReads(
         s.dataType, 
sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled))
   }
@@ -113,6 +111,18 @@ class OrcFileFormat
     true
   }
 
+  /**
+   * Build the reader.
+   *
+   * @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in 
options, to indicate whether
+   *       the reader should return row or columnar output.
+   *       If the caller can handle both, pass
+   *       FileFormat.OPTION_RETURNING_BATCH ->
+   *         supportBatch(sparkSession,
+   *           StructType(requiredSchema.fields ++ partitionSchema.fields))
+   *       as the option.
+   *       It should be set to "true" only if this reader can support it.
+   */
   override def buildReaderWithPartitionValues(
       sparkSession: SparkSession,
       dataSchema: StructType,
@@ -124,9 +134,24 @@ class OrcFileFormat
 
     val resultSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
     val sqlConf = sparkSession.sessionState.conf
-    val enableVectorizedReader = supportBatch(sparkSession, resultSchema)
     val capacity = sqlConf.orcVectorizedReaderBatchSize
 
+    // Should always be set by FileSourceScanExec creating this.
+    // Check conf before checking option, to allow working around an issue by 
changing conf.
+    val enableVectorizedReader = sqlConf.orcVectorizedReaderEnabled &&
+      options.get(FileFormat.OPTION_RETURNING_BATCH)
+        .getOrElse {
+          throw new IllegalArgumentException(
+            "OPTION_RETURNING_BATCH should always be set for OrcFileFormat. " +
+              "To workaround this issue, set 
spark.sql.orc.enableVectorizedReader=false.")
+        }
+        .equals("true")
+    if (enableVectorizedReader) {
+      // If the passed option said that we are to return batches, we need to 
also be able to
+      // do this based on config and resultSchema.
+      assert(supportBatch(sparkSession, resultSchema))
+    }
+
     OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, 
sqlConf.caseSensitiveAnalysis)
 
     val broadcastedConf =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 9765e7c7801..f66434d3caf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -45,7 +45,6 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
 import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector}
 import org.apache.spark.sql.internal.SQLConf
@@ -169,12 +168,11 @@ class ParquetFileFormat
   }
 
   /**
-   * Returns whether the reader will return the rows as batch or not.
+   * Returns whether the reader can return the rows as batch or not.
    */
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
     val conf = sparkSession.sessionState.conf
-    ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && 
conf.wholeStageEnabled &&
-      !WholeStageCodegenExec.isTooManyFields(conf, schema)
+    ParquetUtils.isBatchReadSupportedForSchema(conf, schema)
   }
 
   override def vectorTypes(
@@ -197,6 +195,18 @@ class ParquetFileFormat
     true
   }
 
+  /**
+   * Build the reader.
+   *
+   * @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in 
options, to indicate whether
+   *       the reader should return row or columnar output.
+   *       If the caller can handle both, pass
+   *       FileFormat.OPTION_RETURNING_BATCH ->
+   *         supportBatch(sparkSession,
+   *           StructType(requiredSchema.fields ++ partitionSchema.fields))
+   *       as the option.
+   *       It should be set to "true" only if this reader can support it.
+   */
   override def buildReaderWithPartitionValues(
       sparkSession: SparkSession,
       dataSchema: StructType,
@@ -245,8 +255,6 @@ class ParquetFileFormat
     val timestampConversion: Boolean = 
sqlConf.isParquetINT96TimestampConversion
     val capacity = sqlConf.parquetVectorizedReaderBatchSize
     val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
-    // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
-    val returningBatch = supportBatch(sparkSession, resultSchema)
     val pushDownDate = sqlConf.parquetFilterPushDownDate
     val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
     val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
@@ -257,6 +265,22 @@ class ParquetFileFormat
     val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
     val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
 
+    // Should always be set by FileSourceScanExec creating this.
+    // Check conf before checking option, to allow working around an issue by 
changing conf.
+    val returningBatch = 
sparkSession.sessionState.conf.parquetVectorizedReaderEnabled &&
+      options.get(FileFormat.OPTION_RETURNING_BATCH)
+        .getOrElse {
+          throw new IllegalArgumentException(
+            "OPTION_RETURNING_BATCH should always be set for 
ParquetFileFormat. " +
+              "To workaround this issue, set 
spark.sql.parquet.enableVectorizedReader=false.")
+        }
+        .equals("true")
+    if (returningBatch) {
+      // If the passed option said that we are to return batches, we need to 
also be able to
+      // do this based on config and resultSchema.
+      assert(supportBatch(sparkSession, resultSchema))
+    }
+
     (file: PartitionedFile) => {
       assert(file.partitionValues.numFields == partitionSchema.size)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index ad75f634050..5ab439746c8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -622,4 +622,30 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
+
+  Seq("parquet", "orc").foreach { format =>
+    test(s"SPARK-40918: Output cols around WSCG.isTooManyFields limit in 
$format") {
+      // The issue was that ParquetFileFormat would not count the _metadata 
columns towards
+      // the WholeStageCodegenExec.isTooManyFields limit, while 
FileSourceScanExec would,
+      // resulting in Parquet reader returning columnar output, while scan 
expected row.
+      withTempPath { dir =>
+        sql(s"SELECT ${(1 to 100).map(i => s"id+$i as c$i").mkString(", ")} 
FROM RANGE(100)")
+          .write.format(format).save(dir.getAbsolutePath)
+        (98 to 102).foreach { wscgCols =>
+          withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> 
wscgCols.toString) {
+            // Would fail with
+            // java.lang.ClassCastException: 
org.apache.spark.sql.vectorized.ColumnarBatch
+            // cannot be cast to org.apache.spark.sql.catalyst.InternalRow
+            sql(
+              s"""
+                 |SELECT
+                 |  ${(1 to 100).map(i => s"sum(c$i)").mkString(", ")},
+                 |  max(_metadata.file_path)
+                 |FROM $format.`$dir`""".stripMargin
+            ).collect()
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to