This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b9bc3c79190 [SPARK-39951][SQL] Update Parquet V2 columnar check for nested fields b9bc3c79190 is described below commit b9bc3c79190fd2fbe91001a96c738a176e3e0e10 Author: Adam Binford <adam...@gmail.com> AuthorDate: Tue Aug 2 16:50:05 2022 -0700 [SPARK-39951][SQL] Update Parquet V2 columnar check for nested fields ### What changes were proposed in this pull request? Update the `supportsColumnarReads` check for Parquet V2 to take into account support for nested fields. Also fixed a typo I saw in one of the tests. ### Why are the changes needed? Match Parquet V1 in returning columnar batches if nested field vectorization is enabled. ### Does this PR introduce _any_ user-facing change? Parquet V2 scans will return columnar batches with nested fields if the config is enabled. ### How was this patch tested? Added new UTs checking both V1 and V2 return columnar batches for nested fields when the config is enabled. Closes #37379 from Kimahriman/parquet-v2-columnar. Authored-by: Adam Binford <adam...@gmail.com> Signed-off-by: Chao Sun <sunc...@apple.com> --- .../datasources/parquet/ParquetFileFormat.scala | 5 +-- .../v2/parquet/ParquetPartitionReaderFactory.scala | 9 +++-- .../datasources/parquet/ParquetQuerySuite.scala | 43 ++++++++++++++++++++++ .../parquet/ParquetSchemaPruningSuite.scala | 2 +- 4 files changed, 51 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 3349f335841..513379d23d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -167,9 +167,8 @@ class ParquetFileFormat */ override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { val conf = sparkSession.sessionState.conf - conf.parquetVectorizedReaderEnabled && conf.wholeStageEnabled && - ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && - !WholeStageCodegenExec.isTooManyFields(conf, schema) + ParquetUtils.isBatchReadSupportedForSchema(conf, schema) && conf.wholeStageEnabled && + !WholeStageCodegenExec.isTooManyFields(conf, schema) } override def vectorTypes( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index c9572e474c8..0f6e5201df8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -37,12 +37,13 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec import org.apache.spark.sql.connector.expressions.aggregate.Aggregation import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} +import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.{AtomicType, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration @@ -72,6 +73,8 @@ case class ParquetPartitionReaderFactory( private val enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled private val enableVectorizedReader: Boolean = ParquetUtils.isBatchReadSupportedForSchema(sqlConf, resultSchema) + private val supportsColumnar = enableVectorizedReader && sqlConf.wholeStageEnabled && + !WholeStageCodegenExec.isTooManyFields(sqlConf, resultSchema) private val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled private val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion private val capacity = sqlConf.parquetVectorizedReaderBatchSize @@ -104,9 +107,7 @@ case class ParquetPartitionReaderFactory( } override def supportColumnarReads(partition: InputPartition): Boolean = { - sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled && - resultSchema.length <= sqlConf.wholeStageMaxNumFields && - resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + supportsColumnar } override def buildReader(file: PartitionedFile): PartitionReader[InternalRow] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 56694c2ca4e..5c75852af03 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.datasources.{SchemaColumnConvertNotSupport import org.apache.spark.sql.execution.datasources.parquet.TestingUDT._ import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan +import org.apache.spark.sql.functions.struct import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -1136,6 +1137,25 @@ class ParquetV1QuerySuite extends ParquetQuerySuite { val fileScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get assert(fileScan3.asInstanceOf[FileSourceScanExec].supportsColumnar) checkAnswer(df3, df.selectExpr(columns : _*)) + + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { + val df4 = spark.range(10).select(struct( + Seq.tabulate(11) {i => ($"id" + i).as(s"c$i")} : _*).as("nested")) + df4.write.mode(SaveMode.Overwrite).parquet(path) + + // do not return batch - whole stage codegen is disabled for wide table (>200 columns) + val df5 = spark.read.parquet(path) + val fileScan5 = df5.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + assert(!fileScan5.asInstanceOf[FileSourceScanExec].supportsColumnar) + checkAnswer(df5, df4) + + // return batch + val columns2 = Seq.tabulate(9) {i => s"nested.c$i"} + val df6 = df5.selectExpr(columns2 : _*) + val fileScan6 = df6.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + assert(fileScan6.asInstanceOf[FileSourceScanExec].supportsColumnar) + checkAnswer(df6, df4.selectExpr(columns2 : _*)) + } } } } @@ -1173,6 +1193,29 @@ class ParquetV2QuerySuite extends ParquetQuerySuite { val parquetScan3 = fileScan3.asInstanceOf[BatchScanExec].scan.asInstanceOf[ParquetScan] assert(parquetScan3.createReaderFactory().supportColumnarReads(null)) checkAnswer(df3, df.selectExpr(columns : _*)) + + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { + val df4 = spark.range(10).select(struct( + Seq.tabulate(11) {i => ($"id" + i).as(s"c$i")} : _*).as("nested")) + df4.write.mode(SaveMode.Overwrite).parquet(path) + + // do not return batch - whole stage codegen is disabled for wide table (>200 columns) + val df5 = spark.read.parquet(path) + val fileScan5 = df5.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get + val parquetScan5 = fileScan5.asInstanceOf[BatchScanExec].scan.asInstanceOf[ParquetScan] + // The method `supportColumnarReads` in Parquet doesn't depends on the input partition. + // Here we can pass null input partition to the method for testing propose. + assert(!parquetScan5.createReaderFactory().supportColumnarReads(null)) + checkAnswer(df5, df4) + + // return batch + val columns2 = Seq.tabulate(9) {i => s"nested.c$i"} + val df6 = df5.selectExpr(columns2 : _*) + val fileScan6 = df6.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get + val parquetScan6 = fileScan6.asInstanceOf[BatchScanExec].scan.asInstanceOf[ParquetScan] + assert(parquetScan6.createReaderFactory().supportColumnarReads(null)) + checkAnswer(df6, df4.selectExpr(columns2 : _*)) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala index 6a93b72472c..5c0b7def039 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala @@ -32,7 +32,7 @@ abstract class ParquetSchemaPruningSuite extends SchemaPruningSuite with Adaptiv override protected val vectorizedReaderEnabledKey: String = SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key override protected val vectorizedReaderNestedEnabledKey: String = - SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key + SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org