CTTY commented on code in PR #8885:
URL: https://github.com/apache/hudi/pull/8885#discussion_r1223566158
##
hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34HoodieParquetFileFormat.scala:
##
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{Cast, JoinedRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.WholeStageCodegenExec
+import
org.apache.spark.sql.execution.datasources.parquet.Spark34HoodieParquetFileFormat._
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils,
PartitionedFile, RecordReaderIterator}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{AtomicType, DataType, StructField,
StructType}
+import org.apache.spark.util.SerializableConfiguration
+/**
+ * This class is an extension of [[ParquetFileFormat]] overriding
Spark-specific behavior
+ * that's not possible to customize in any other way
+ *
+ * NOTE: This is a version of [[AvroDeserializer]] impl from Spark 3.2.1 w/ w/
the following changes applied to it:
+ *
+ * Avoiding appending partition values to the rows read from the data
file
+ * Schema on-read
+ *
+ */
+class Spark34HoodieParquetFileFormat(private val shouldAppendPartitionValues:
Boolean) extends ParquetFileFormat {
+
+ override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
+val conf = sparkSession.sessionState.conf
+conf.parquetVectorizedReaderEnabled &&
schema.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+
+ def supportsColumnar(sparkSession: SparkSession, schema: StructType):
Boolean = {
+val conf = sparkSession.sessionState.conf
+// Only output columnar if there is WSCG to read it.
+val requiredWholeStageCodegenSettings =
+ conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf,
schema)
+requiredWholeStageCodegenSettings &&
+ supportBatch(sparkSession, schema)
+ }
+
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
+hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+hadoopConf.set(
+ ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+ requiredSchema.json)
+hadoopConf.set(
+ ParquetWriteSupport.SPA