This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b9bc3c79190 [SPARK-39951][SQL] Update Parquet V2 columnar check for 
nested fields
b9bc3c79190 is described below

commit b9bc3c79190fd2fbe91001a96c738a176e3e0e10
Author: Adam Binford <adam...@gmail.com>
AuthorDate: Tue Aug 2 16:50:05 2022 -0700

    [SPARK-39951][SQL] Update Parquet V2 columnar check for nested fields
    
    ### What changes were proposed in this pull request?
    
    Update the `supportsColumnarReads` check for Parquet V2 to take into 
account support for nested fields. Also fixed a typo I saw in one of the tests.
    
    ### Why are the changes needed?
    
    Match Parquet V1 in returning columnar batches if nested field 
vectorization is enabled.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Parquet V2 scans will return columnar batches with nested fields if the 
config is enabled.
    
    ### How was this patch tested?
    
    Added new UTs checking both V1 and V2 return columnar batches for nested 
fields when the config is enabled.
    
    Closes #37379 from Kimahriman/parquet-v2-columnar.
    
    Authored-by: Adam Binford <adam...@gmail.com>
    Signed-off-by: Chao Sun <sunc...@apple.com>
---
 .../datasources/parquet/ParquetFileFormat.scala    |  5 +--
 .../v2/parquet/ParquetPartitionReaderFactory.scala |  9 +++--
 .../datasources/parquet/ParquetQuerySuite.scala    | 43 ++++++++++++++++++++++
 .../parquet/ParquetSchemaPruningSuite.scala        |  2 +-
 4 files changed, 51 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 3349f335841..513379d23d6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -167,9 +167,8 @@ class ParquetFileFormat
    */
   override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
     val conf = sparkSession.sessionState.conf
-    conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled &&
-      ParquetUtils.isBatchReadSupportedForSchema(conf, schema) &&
-        !WholeStageCodegenExec.isTooManyFields(conf, schema)
+    ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && 
conf.wholeStageEnabled &&
+      !WholeStageCodegenExec.isTooManyFields(conf, schema)
   }
 
   override def vectorTypes(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index c9572e474c8..0f6e5201df8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -37,12 +37,13 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
+import org.apache.spark.sql.execution.WholeStageCodegenExec
 import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, 
DataSourceUtils, PartitionedFile, RecordReaderIterator}
 import org.apache.spark.sql.execution.datasources.parquet._
 import org.apache.spark.sql.execution.datasources.v2._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{AtomicType, StructType}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.SerializableConfiguration
 
@@ -72,6 +73,8 @@ case class ParquetPartitionReaderFactory(
   private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled
   private val enableVectorizedReader: Boolean =
     ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema)
+  private val supportsColumnar = enableVectorizedReader && 
sqlConf.wholeStageEnabled &&
+    !WholeStageCodegenExec.isTooManyFields(sqlConf, resultSchema)
   private val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
   private val timestampConversion: Boolean = 
sqlConf.isParquetINT96TimestampConversion
   private val capacity = sqlConf.parquetVectorizedReaderBatchSize
@@ -104,9 +107,7 @@ case class ParquetPartitionReaderFactory(
   }
 
   override def supportColumnarReads(partition: InputPartition): Boolean = {
-    sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled &&
-      resultSchema.length <= sqlConf.wholeStageMaxNumFields &&
-      resultSchema.forall(_.dataType.isInstanceOf[AtomicType])
+    supportsColumnar
   }
 
   override def buildReader(file: PartitionedFile): 
PartitionReader[InternalRow] = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 56694c2ca4e..5c75852af03 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -35,6 +35,7 @@ import 
org.apache.spark.sql.execution.datasources.{SchemaColumnConvertNotSupport
 import org.apache.spark.sql.execution.datasources.parquet.TestingUDT._
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+import org.apache.spark.sql.functions.struct
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
@@ -1136,6 +1137,25 @@ class ParquetV1QuerySuite extends ParquetQuerySuite {
         val fileScan3 = 
df3.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
         assert(fileScan3.asInstanceOf[FileSourceScanExec].supportsColumnar)
         checkAnswer(df3, df.selectExpr(columns : _*))
+
+        
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> 
"true") {
+          val df4 = spark.range(10).select(struct(
+            Seq.tabulate(11) {i => ($"id" + i).as(s"c$i")} : _*).as("nested"))
+          df4.write.mode(SaveMode.Overwrite).parquet(path)
+
+          // do not return batch - whole stage codegen is disabled for wide 
table (>200 columns)
+          val df5 = spark.read.parquet(path)
+          val fileScan5 = 
df5.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
+          assert(!fileScan5.asInstanceOf[FileSourceScanExec].supportsColumnar)
+          checkAnswer(df5, df4)
+
+          // return batch
+          val columns2 = Seq.tabulate(9) {i => s"nested.c$i"}
+          val df6 = df5.selectExpr(columns2 : _*)
+          val fileScan6 = 
df6.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
+          assert(fileScan6.asInstanceOf[FileSourceScanExec].supportsColumnar)
+          checkAnswer(df6, df4.selectExpr(columns2 : _*))
+        }
       }
     }
   }
@@ -1173,6 +1193,29 @@ class ParquetV2QuerySuite extends ParquetQuerySuite {
         val parquetScan3 = 
fileScan3.asInstanceOf[BatchScanExec].scan.asInstanceOf[ParquetScan]
         assert(parquetScan3.createReaderFactory().supportColumnarReads(null))
         checkAnswer(df3, df.selectExpr(columns : _*))
+
+        
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> 
"true") {
+          val df4 = spark.range(10).select(struct(
+            Seq.tabulate(11) {i => ($"id" + i).as(s"c$i")} : _*).as("nested"))
+          df4.write.mode(SaveMode.Overwrite).parquet(path)
+
+          // do not return batch - whole stage codegen is disabled for wide 
table (>200 columns)
+          val df5 = spark.read.parquet(path)
+          val fileScan5 = 
df5.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get
+          val parquetScan5 = 
fileScan5.asInstanceOf[BatchScanExec].scan.asInstanceOf[ParquetScan]
+          // The method `supportColumnarReads` in Parquet doesn't depends on 
the input partition.
+          // Here we can pass null input partition to the method for testing 
propose.
+          
assert(!parquetScan5.createReaderFactory().supportColumnarReads(null))
+          checkAnswer(df5, df4)
+
+          // return batch
+          val columns2 = Seq.tabulate(9) {i => s"nested.c$i"}
+          val df6 = df5.selectExpr(columns2 : _*)
+          val fileScan6 = 
df6.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get
+          val parquetScan6 = 
fileScan6.asInstanceOf[BatchScanExec].scan.asInstanceOf[ParquetScan]
+          assert(parquetScan6.createReaderFactory().supportColumnarReads(null))
+          checkAnswer(df6, df4.selectExpr(columns2 : _*))
+        }
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
index 6a93b72472c..5c0b7def039 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
@@ -32,7 +32,7 @@ abstract class ParquetSchemaPruningSuite extends 
SchemaPruningSuite with Adaptiv
   override protected val vectorizedReaderEnabledKey: String =
     SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key
   override protected val vectorizedReaderNestedEnabledKey: String =
-    SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key
+    SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key
 
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to