This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f715e8a02e8 [HUDI-7565] Create spark file readers to read a single 
file instead of an entire partition (#10954)
f715e8a02e8 is described below

commit f715e8a02e8ee5561274ad38bdda5e863317b240
Author: Jon Vexler <jbvex...@gmail.com>
AuthorDate: Fri Apr 12 13:29:12 2024 -0400

    [HUDI-7565] Create spark file readers to read a single file instead of an 
entire partition (#10954)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../datasources/parquet/SparkParquetReader.scala   |  44 ++++
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |  18 +-
 .../parquet/SparkParquetReaderBase.scala           |  96 +++++++
 .../parquet/TestSparkParquetReaderFormat.scala     |  56 ++++
 .../hudi/functional/TestSparkParquetReader.java    |  48 ++++
 .../org/apache/hudi/util/JavaConversions.scala     |  22 +-
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |  20 +-
 .../datasources/parquet/Spark24ParquetReader.scala | 225 ++++++++++++++++
 .../apache/spark/sql/adapter/Spark3_0Adapter.scala |  20 +-
 .../datasources/parquet/Spark30ParquetReader.scala | 229 +++++++++++++++++
 .../apache/spark/sql/adapter/Spark3_1Adapter.scala |  19 +-
 .../datasources/parquet/Spark31ParquetReader.scala | 242 ++++++++++++++++++
 .../apache/spark/sql/adapter/Spark3_2Adapter.scala |  20 +-
 .../datasources/parquet/Spark32ParquetReader.scala | 267 +++++++++++++++++++
 .../apache/spark/sql/adapter/Spark3_3Adapter.scala |  20 +-
 .../datasources/parquet/Spark33ParquetReader.scala | 268 +++++++++++++++++++
 .../apache/spark/sql/adapter/Spark3_4Adapter.scala |  20 +-
 .../datasources/parquet/Spark34ParquetReader.scala | 277 ++++++++++++++++++++
 .../apache/spark/sql/adapter/Spark3_5Adapter.scala |  20 +-
 .../datasources/parquet/Spark35ParquetReader.scala | 284 +++++++++++++++++++++
 20 files changed, 2206 insertions(+), 9 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
new file mode 100644
index 00000000000..920e4cb0e0b
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReader.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+trait SparkParquetReader extends Serializable {
+  /**
+   * Read an individual parquet file
+   *
+   * @param file            parquet file to read
+   * @param requiredSchema  desired output schema of the data
+   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
+   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf      the hadoop conf
+   * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
+   */
+  def read(file: PartitionedFile,
+           requiredSchema: StructType,
+           partitionSchema: StructType,
+           filters: Seq[Filter],
+           sharedConf: Configuration): Iterator[InternalRow]
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 1c6111afe47..91fe6dabc2e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -19,6 +19,7 @@
 package org.apache.spark.sql.hudi
 
 import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -33,7 +34,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{Command, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateFormatter
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
SparkParquetReader}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.parser.HoodieExtendedParserInterface
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.{DataType, Metadata, StructType}
@@ -214,4 +216,18 @@ trait SparkAdapter extends Serializable {
    * Tries to translate a Catalyst Expression into data source Filter
    */
   def translateFilter(predicate: Expression, supportNestedPredicatePushdown: 
Boolean = false): Option[Filter]
+
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  def createParquetFileReader(vectorized: Boolean,
+                              sqlConf: SQLConf,
+                              options: Map[String, String],
+                              hadoopConf: Configuration): SparkParquetReader
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
new file mode 100644
index 00000000000..2b47da76456
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+abstract class SparkParquetReaderBase(enableVectorizedReader: Boolean,
+                                      enableParquetFilterPushDown: Boolean,
+                                      pushDownDate: Boolean,
+                                      pushDownTimestamp: Boolean,
+                                      pushDownDecimal: Boolean,
+                                      pushDownInFilterThreshold: Int,
+                                      isCaseSensitive: Boolean,
+                                      timestampConversion: Boolean,
+                                      enableOffHeapColumnVector: Boolean,
+                                      capacity: Int,
+                                      returningBatch: Boolean,
+                                      enableRecordFilter: Boolean,
+                                      timeZoneId: Option[String]) extends 
SparkParquetReader {
+  /**
+   * Read an individual parquet file
+   *
+   * @param file            parquet file to read
+   * @param requiredSchema  desired output schema of the data
+   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
+   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf      the hadoop conf
+   * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
+   */
+  final def read(file: PartitionedFile,
+                 requiredSchema: StructType,
+                 partitionSchema: StructType,
+                 filters: Seq[Filter],
+                 sharedConf: Configuration): Iterator[InternalRow] = {
+    val conf = new Configuration(sharedConf)
+    conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, 
requiredSchema.json)
+    conf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json)
+    ParquetWriteSupport.setSchema(requiredSchema, conf)
+    doRead(file, requiredSchema, partitionSchema, filters, conf)
+  }
+
+  /**
+   * Implemented for each spark version
+   *
+   * @param file            parquet file to read
+   * @param requiredSchema  desired output schema of the data
+   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
+   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf      the hadoop conf
+   * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
+   */
+  protected def doRead(file: PartitionedFile,
+                       requiredSchema: StructType,
+                       partitionSchema: StructType,
+                       filters: Seq[Filter],
+                       sharedConf: Configuration): Iterator[InternalRow]
+
+}
+
+trait SparkParquetReaderBuilder {
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return properties needed for reading a parquet file
+   */
+  def build(vectorized: Boolean,
+            sqlConf: SQLConf,
+            options: Map[String, String],
+            hadoopConf: Configuration): SparkParquetReader
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
new file mode 100644
index 00000000000..bf513847cfc
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSparkParquetReaderFormat.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * Class used to test [[SparkParquetReader]]
+ * This class should have the same functionality as [[ParquetFileFormat]]
+ */
+class TestSparkParquetReaderFormat extends ParquetFileFormat with 
SparkAdapterSupport {
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
+    //reader must be created outsize of the lambda. This happens on the driver
+    val reader = 
sparkAdapter.createParquetFileReader(supportBatch(sparkSession,
+      StructType(partitionSchema.fields ++ requiredSchema.fields)),
+      sparkSession.sqlContext.conf, options, hadoopConf)
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
+
+    (file: PartitionedFile) => {
+      //code inside the lambda will run on the executor
+      reader.read(file, requiredSchema, partitionSchema, filters, 
broadcastedHadoopConf.value.value)
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java
new file mode 100644
index 00000000000..1c755ce3c8f
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkParquetReader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.util.JavaConversions;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("functional")
+public class TestSparkParquetReader extends TestBootstrapReadBase {
+
+  @Test
+  public void testReader() {
+    dataGen = new HoodieTestDataGenerator(dashPartitionPaths);
+    int n = 10;
+    Dataset<Row> inserts = makeInsertDf("000", n);
+    inserts.write().format("parquet").save(bootstrapBasePath);
+    Dataset<Row> parquetReadRows = 
JavaConversions.createTestDataFrame(sparkSession, bootstrapBasePath);
+    Dataset<Row> datasourceReadRows = 
sparkSession.read().format("parquet").load(bootstrapBasePath);
+    assertEquals(datasourceReadRows.count(), n);
+    assertEquals(parquetReadRows.count(), n);
+    assertEquals(datasourceReadRows.except(parquetReadRows).count(), 0);
+    assertEquals(parquetReadRows.except(datasourceReadRows).count(), 0);
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/JavaConversions.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/JavaConversions.scala
index c9abe090975..52333e72628 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/JavaConversions.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/JavaConversions.scala
@@ -18,9 +18,14 @@
 
 package org.apache.hudi.util
 
+import 
org.apache.spark.sql.execution.datasources.parquet.TestSparkParquetReaderFormat
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
 import java.util.function.Predicate
 
-object JavaConversions {
+object JavaConversions extends SparkAdapterSupport {
   def getPredicate[T](function1: (T) => Boolean): Predicate[T] = {
     new Predicate[T] {
       override def test(t: T): Boolean = function1.apply(t)
@@ -34,4 +39,19 @@ object JavaConversions {
       }
     }
   }
+
+  /**
+   * Read parquet files using [[TestSparkParquetReaderFormat]]
+   *
+   * @param sparkSession the spark session
+   * @param paths comma seperated list of parquet files or directories 
containing parquet files
+   * @return dataframe containing the data from the input paths
+   */
+  def createTestDataFrame(sparkSession: SparkSession, paths: String): 
DataFrame = {
+    sparkSession.sqlContext.baseRelationToDataFrame(DataSource.apply(
+      sparkSession = sparkSession,
+      className = 
"org.apache.spark.sql.execution.datasources.parquet.TestSparkParquetReaderFormat",
+      paths =  paths.split(",").toSeq
+    ).resolveRelation())
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index 932e3dd05f0..7b5f5847562 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -19,6 +19,7 @@
 package org.apache.spark.sql.adapter
 
 import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -31,10 +32,11 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable}
 import org.apache.spark.sql.catalyst.util.DateFormatter
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark24LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark24LegacyHoodieParquetFileFormat, Spark24ParquetReader, SparkParquetReader}
 import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.parser.HoodieExtendedParserInterface
 import org.apache.spark.sql.sources.{BaseRelation, Filter}
 import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, 
StructType}
@@ -205,4 +207,20 @@ class Spark2Adapter extends SparkAdapter {
     batch.setNumRows(numRows)
     batch
   }
+
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  override def createParquetFileReader(vectorized: Boolean,
+                                       sqlConf: SQLConf,
+                                       options: Map[String, String],
+                                       hadoopConf: Configuration): 
SparkParquetReader = {
+    Spark24ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
new file mode 100644
index 00000000000..7fa30a36222
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, 
ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.{PartitionedFile, 
RecordReaderIterator}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+import java.net.URI
+
+class Spark24ParquetReader(enableVectorizedReader: Boolean,
+                           enableParquetFilterPushDown: Boolean,
+                           pushDownDate: Boolean,
+                           pushDownTimestamp: Boolean,
+                           pushDownDecimal: Boolean,
+                           pushDownInFilterThreshold: Int,
+                           pushDownStringStartWith: Boolean,
+                           isCaseSensitive: Boolean,
+                           timestampConversion: Boolean,
+                           enableOffHeapColumnVector: Boolean,
+                           capacity: Int,
+                           returningBatch: Boolean,
+                           enableRecordFilter: Boolean,
+                           timeZoneId: Option[String]) extends 
SparkParquetReaderBase(
+  enableVectorizedReader = enableVectorizedReader,
+  enableParquetFilterPushDown = enableParquetFilterPushDown,
+  pushDownDate = pushDownDate,
+  pushDownTimestamp = pushDownTimestamp,
+  pushDownDecimal = pushDownDecimal,
+  pushDownInFilterThreshold = pushDownInFilterThreshold,
+  isCaseSensitive = isCaseSensitive,
+  timestampConversion = timestampConversion,
+  enableOffHeapColumnVector = enableOffHeapColumnVector,
+  capacity = capacity,
+  returningBatch = returningBatch,
+  enableRecordFilter = enableRecordFilter,
+  timeZoneId = timeZoneId) {
+
+  /**
+   * Read an individual parquet file
+   * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark 
v2.4.8 adapted here
+   *
+   * @param file            parquet file to read
+   * @param requiredSchema  desired output schema of the data
+   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
+   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf      the hadoop conf
+   * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
+   */
+  protected def doRead(file: PartitionedFile,
+                       requiredSchema: StructType,
+                       partitionSchema: StructType,
+                       filters: Seq[Filter],
+                       sharedConf: Configuration): Iterator[InternalRow] = {
+    assert(file.partitionValues.numFields == partitionSchema.size)
+
+    val fileSplit =
+      new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, 
Array.empty)
+    val filePath = fileSplit.getPath
+
+    val split =
+      new org.apache.parquet.hadoop.ParquetInputSplit(
+        filePath,
+        fileSplit.getStart,
+        fileSplit.getStart + fileSplit.getLength,
+        fileSplit.getLength,
+        fileSplit.getLocations,
+        null)
+
+    lazy val footerFileMetaData =
+      ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
+    // Try to push down filters when filter push-down is enabled.
+    val pushed = if (enableParquetFilterPushDown) {
+      val parquetSchema = footerFileMetaData.getSchema
+      val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, 
pushDownDecimal,
+        pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
+      filters
+        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
+        // is used here.
+        .flatMap(parquetFilters.createFilter(parquetSchema, _))
+        .reduceOption(FilterApi.and)
+    } else {
+      None
+    }
+
+    // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions 
to int96 timestamps'
+    // *only* if the file was created by something other than "parquet-mr", so 
check the actual
+    // writer here for this file.  We have to do this per-file, as each file 
in the table may
+    // have different writers.
+    // Define isCreatedByParquetMr as function to avoid unnecessary parquet 
footer reads.
+    def isCreatedByParquetMr: Boolean =
+      footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+    val convertTz =
+      if (timestampConversion && !isCreatedByParquetMr) {
+        
Some(DateTimeUtils.getTimeZone(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+      } else {
+        None
+      }
+
+    val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
+    val hadoopAttemptContext =
+      new TaskAttemptContextImpl(sharedConf, attemptId)
+
+    // Try to push down filters when filter push-down is enabled.
+    // Notice: This push-down is RowGroups level, not individual records.
+    if (pushed.isDefined) {
+      
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
+    }
+    val taskContext = Option(TaskContext.get())
+    if (enableVectorizedReader) {
+      val vectorizedReader = new VectorizedParquetRecordReader(
+        convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, 
capacity)
+      val iter = new RecordReaderIterator(vectorizedReader)
+      // SPARK-23457 Register a task completion lister before `initialization`.
+      taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+      vectorizedReader.initialize(split, hadoopAttemptContext)
+      vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+      if (returningBatch) {
+        vectorizedReader.enableReturningBatches()
+      }
+
+      // UnsafeRowParquetRecordReader appends the columns internally to avoid 
another copy.
+      iter.asInstanceOf[Iterator[InternalRow]]
+    } else {
+      // ParquetRecordReader returns UnsafeRow
+      val reader = if (pushed.isDefined && enableRecordFilter) {
+        val parquetFilter = FilterCompat.get(pushed.get, null)
+        new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), 
parquetFilter)
+      } else {
+        new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
+      }
+      val iter = new RecordReaderIterator(reader)
+      // SPARK-23457 Register a task completion lister before `initialization`.
+      taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+      reader.initialize(split, hadoopAttemptContext)
+
+      val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
+      val joinedRow = new JoinedRow()
+      val appendPartitionColumns = 
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+
+      // This is a horrible erasure hack...  if we type the iterator above, 
then it actually check
+      // the type in next() and we get a class cast exception.  If we make 
that function return
+      // Object, then we can defer the cast until later!
+      if (partitionSchema.length == 0) {
+        // There is no partition columns
+        iter.asInstanceOf[Iterator[InternalRow]]
+      } else {
+        iter.asInstanceOf[Iterator[InternalRow]]
+          .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
+      }
+    }
+  }
+}
+
+object Spark24ParquetReader extends SparkParquetReaderBuilder {
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  def build(vectorized: Boolean,
+            sqlConf: SQLConf,
+            options: Map[String, String],
+            hadoopConf: Configuration): SparkParquetReader = {
+    //set hadoopconf
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+    hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, 
sqlConf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, 
sqlConf.caseSensitiveAnalysis)
+    hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, 
sqlConf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, 
sqlConf.isParquetINT96AsTimestamp)
+
+    new Spark24ParquetReader(
+      enableVectorizedReader = vectorized,
+      enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+      pushDownDate = sqlConf.parquetFilterPushDownDate,
+      pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+      pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+      pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold,
+      pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith,
+      isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+      timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+      enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+      capacity = sqlConf.parquetVectorizedReaderBatchSize,
+      returningBatch = sqlConf.parquetVectorizedReaderEnabled,
+      enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+      timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
index 22a9f090fb3..8fbcf5a060b 100644
--- 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_0Adapter.scala
@@ -19,6 +19,7 @@
 package org.apache.spark.sql.adapter
 
 import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.Spark30HoodieFileScanRDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, 
HoodieAvroSerializer, HoodieSpark3_0AvroDeserializer, 
HoodieSpark3_0AvroSerializer}
@@ -29,10 +30,11 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark30LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark30LegacyHoodieParquetFileFormat, Spark30ParquetReader, SparkParquetReader}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
HoodieSpark30PartitionedFileUtils, HoodieSparkPartitionedFileUtils, 
PartitionedFile}
 import org.apache.spark.sql.hudi.SparkAdapter
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, 
HoodieSpark3_0ExtendedSqlParser}
 import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, 
StructType}
 import org.apache.spark.sql.vectorized.ColumnarUtils
@@ -118,4 +120,20 @@ class Spark3_0Adapter extends BaseSpark3Adapter {
     case OFF_HEAP => "OFF_HEAP"
     case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: 
$level")
   }
+
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  override def createParquetFileReader(vectorized: Boolean,
+                                             sqlConf: SQLConf,
+                                             options: Map[String, String],
+                                             hadoopConf: Configuration): 
SparkParquetReader = {
+    Spark30ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
new file mode 100644
index 00000000000..9088676c335
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop._
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+import java.net.URI
+
+class Spark30ParquetReader(enableVectorizedReader: Boolean,
+                           enableParquetFilterPushDown: Boolean,
+                           pushDownDate: Boolean,
+                           pushDownTimestamp: Boolean,
+                           pushDownDecimal: Boolean,
+                           pushDownInFilterThreshold: Int,
+                           pushDownStringStartWith: Boolean,
+                           isCaseSensitive: Boolean,
+                           timestampConversion: Boolean,
+                           enableOffHeapColumnVector: Boolean,
+                           capacity: Int,
+                           returningBatch: Boolean,
+                           enableRecordFilter: Boolean,
+                           timeZoneId: Option[String]) extends 
SparkParquetReaderBase(
+  enableVectorizedReader = enableVectorizedReader,
+  enableParquetFilterPushDown = enableParquetFilterPushDown,
+  pushDownDate = pushDownDate,
+  pushDownTimestamp = pushDownTimestamp,
+  pushDownDecimal = pushDownDecimal,
+  pushDownInFilterThreshold = pushDownInFilterThreshold,
+  isCaseSensitive = isCaseSensitive,
+  timestampConversion = timestampConversion,
+  enableOffHeapColumnVector = enableOffHeapColumnVector,
+  capacity = capacity,
+  returningBatch = returningBatch,
+  enableRecordFilter = enableRecordFilter,
+  timeZoneId = timeZoneId) {
+
+  /**
+   * Read an individual parquet file
+   * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark 
v3.0.3 adapted here
+   *
+   * @param file            parquet file to read
+   * @param requiredSchema  desired output schema of the data
+   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
+   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf      the hadoop conf
+   * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
+   */
+  protected def doRead(file: PartitionedFile,
+                       requiredSchema: StructType,
+                       partitionSchema: StructType,
+                       filters: Seq[Filter],
+                       sharedConf: Configuration): Iterator[InternalRow] = {
+    assert(file.partitionValues.numFields == partitionSchema.size)
+
+    val filePath = new Path(new URI(file.filePath))
+    val split =
+      new org.apache.parquet.hadoop.ParquetInputSplit(
+        filePath,
+        file.start,
+        file.start + file.length,
+        file.length,
+        Array.empty,
+        null)
+
+    lazy val footerFileMetaData =
+      ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
+    // Try to push down filters when filter push-down is enabled.
+    val pushed = if (enableParquetFilterPushDown) {
+      val parquetSchema = footerFileMetaData.getSchema
+      val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, 
pushDownTimestamp,
+        pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, 
isCaseSensitive)
+      filters
+        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
+        // is used here.
+        .flatMap(parquetFilters.createFilter(_))
+        .reduceOption(FilterApi.and)
+    } else {
+      None
+    }
+
+    // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions 
to int96 timestamps'
+    // *only* if the file was created by something other than "parquet-mr", so 
check the actual
+    // writer here for this file.  We have to do this per-file, as each file 
in the table may
+    // have different writers.
+    // Define isCreatedByParquetMr as function to avoid unnecessary parquet 
footer reads.
+    def isCreatedByParquetMr: Boolean =
+      footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+    val convertTz =
+      if (timestampConversion && !isCreatedByParquetMr) {
+        
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+      } else {
+        None
+      }
+
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get,
+      SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
+
+    val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
+    val hadoopAttemptContext =
+      new TaskAttemptContextImpl(sharedConf, attemptId)
+
+    // Try to push down filters when filter push-down is enabled.
+    // Notice: This push-down is RowGroups level, not individual records.
+    if (pushed.isDefined) {
+      
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
+    }
+    val taskContext = Option(TaskContext.get())
+    if (enableVectorizedReader) {
+      val vectorizedReader = new VectorizedParquetRecordReader(
+        convertTz.orNull,
+        datetimeRebaseMode.toString,
+        enableOffHeapColumnVector && taskContext.isDefined,
+        capacity)
+      val iter = new RecordReaderIterator(vectorizedReader)
+      // SPARK-23457 Register a task completion listener before 
`initialization`.
+      taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+      vectorizedReader.initialize(split, hadoopAttemptContext)
+      vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+      if (returningBatch) {
+        vectorizedReader.enableReturningBatches()
+      }
+
+      // UnsafeRowParquetRecordReader appends the columns internally to avoid 
another copy.
+      iter.asInstanceOf[Iterator[InternalRow]]
+    } else {
+      // ParquetRecordReader returns InternalRow
+      val readSupport = new ParquetReadSupport(
+        convertTz, enableVectorizedReader = false, datetimeRebaseMode)
+      val reader = if (pushed.isDefined && enableRecordFilter) {
+        val parquetFilter = FilterCompat.get(pushed.get, null)
+        new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+      } else {
+        new ParquetRecordReader[InternalRow](readSupport)
+      }
+      val iter = new RecordReaderIterator[InternalRow](reader)
+      // SPARK-23457 Register a task completion listener before 
`initialization`.
+      taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+      reader.initialize(split, hadoopAttemptContext)
+
+      val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
+      val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
+
+      if (partitionSchema.length == 0) {
+        // There is no partition columns
+        iter.map(unsafeProjection)
+      } else {
+        val joinedRow = new JoinedRow()
+        iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+      }
+    }
+  }
+
+}
+
+object Spark30ParquetReader extends SparkParquetReaderBuilder {
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  def build(vectorized: Boolean,
+            sqlConf: SQLConf,
+            options: Map[String, String],
+            hadoopConf: Configuration): SparkParquetReader = {
+    //set hadoopconf
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+    hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, 
sqlConf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, 
sqlConf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, 
sqlConf.caseSensitiveAnalysis)
+    hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, 
sqlConf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, 
sqlConf.isParquetINT96AsTimestamp)
+
+    new Spark30ParquetReader(
+      enableVectorizedReader = vectorized,
+      enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+      pushDownDate = sqlConf.parquetFilterPushDownDate,
+      pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+      pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+      pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold,
+      pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith,
+      isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+      timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+      enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+      capacity = sqlConf.parquetVectorizedReaderBatchSize,
+      returningBatch = sqlConf.parquetVectorizedReaderEnabled,
+      enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+      timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
index 8ca072333d0..21f897afe1c 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_1Adapter.scala
@@ -19,6 +19,7 @@
 package org.apache.spark.sql.adapter
 
 import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.Spark31HoodieFileScanRDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, 
HoodieAvroSerializer, HoodieSpark3_1AvroDeserializer, 
HoodieSpark3_1AvroSerializer}
@@ -30,10 +31,11 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark31LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark31LegacyHoodieParquetFileFormat, Spark31ParquetReader, SparkParquetReader}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
HoodieSpark31PartitionedFileUtils, HoodieSparkPartitionedFileUtils, 
PartitionedFile}
 import org.apache.spark.sql.hudi.SparkAdapter
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, 
HoodieSpark3_1ExtendedSqlParser}
 import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, 
StructType}
 import org.apache.spark.sql.vectorized.ColumnarUtils
@@ -121,4 +123,19 @@ class Spark3_1Adapter extends BaseSpark3Adapter {
     case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: 
$level")
   }
 
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  override def createParquetFileReader(vectorized: Boolean,
+                                       sqlConf: SQLConf,
+                                       options: Map[String, String],
+                                       hadoopConf: Configuration): 
SparkParquetReader = {
+    Spark31ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
new file mode 100644
index 00000000000..94a5efaee61
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31ParquetReader.scala
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop._
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+import java.net.URI
+
+class Spark31ParquetReader(enableVectorizedReader: Boolean,
+                           enableParquetFilterPushDown: Boolean,
+                           pushDownDate: Boolean,
+                           pushDownTimestamp: Boolean,
+                           pushDownDecimal: Boolean,
+                           pushDownInFilterThreshold: Int,
+                           pushDownStringStartWith: Boolean,
+                           isCaseSensitive: Boolean,
+                           timestampConversion: Boolean,
+                           enableOffHeapColumnVector: Boolean,
+                           capacity: Int,
+                           returningBatch: Boolean,
+                           enableRecordFilter: Boolean,
+                           timeZoneId: Option[String]) extends 
SparkParquetReaderBase(
+  enableVectorizedReader = enableVectorizedReader,
+  enableParquetFilterPushDown = enableParquetFilterPushDown,
+  pushDownDate = pushDownDate,
+  pushDownTimestamp = pushDownTimestamp,
+  pushDownDecimal = pushDownDecimal,
+  pushDownInFilterThreshold = pushDownInFilterThreshold,
+  isCaseSensitive = isCaseSensitive,
+  timestampConversion = timestampConversion,
+  enableOffHeapColumnVector = enableOffHeapColumnVector,
+  capacity = capacity,
+  returningBatch = returningBatch,
+  enableRecordFilter = enableRecordFilter,
+  timeZoneId = timeZoneId) {
+
+  /**
+   * Read an individual parquet file
+   * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark 
v3.1.3 adapted here
+   *
+   * @param file            parquet file to read
+   * @param requiredSchema  desired output schema of the data
+   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
+   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf      the hadoop conf
+   * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
+   */
+  protected def doRead(file: PartitionedFile,
+                        requiredSchema: StructType,
+                       partitionSchema: StructType,
+                       filters: Seq[Filter],
+                       sharedConf: Configuration): Iterator[InternalRow] = {
+    assert(file.partitionValues.numFields == partitionSchema.size)
+
+    val filePath = new Path(new URI(file.filePath))
+    val split =
+      new org.apache.parquet.hadoop.ParquetInputSplit(
+        filePath,
+        file.start,
+        file.start + file.length,
+        file.length,
+        Array.empty,
+        null)
+
+    lazy val footerFileMetaData =
+      ParquetFileReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
+    val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get,
+      SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ))
+    // Try to push down filters when filter push-down is enabled.
+    val pushed = if (enableParquetFilterPushDown) {
+      val parquetSchema = footerFileMetaData.getSchema
+      val parquetFilters = new ParquetFilters(
+        parquetSchema,
+        pushDownDate,
+        pushDownTimestamp,
+        pushDownDecimal,
+        pushDownStringStartWith,
+        pushDownInFilterThreshold,
+        isCaseSensitive,
+        datetimeRebaseMode)
+      filters
+        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
+        // is used here.
+        .flatMap(parquetFilters.createFilter(_))
+        .reduceOption(FilterApi.and)
+    } else {
+      None
+    }
+
+    // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions 
to int96 timestamps'
+    // *only* if the file was created by something other than "parquet-mr", so 
check the actual
+    // writer here for this file.  We have to do this per-file, as each file 
in the table may
+    // have different writers.
+    // Define isCreatedByParquetMr as function to avoid unnecessary parquet 
footer reads.
+    def isCreatedByParquetMr: Boolean =
+      footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+    val convertTz =
+      if (timestampConversion && !isCreatedByParquetMr) {
+        
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+      } else {
+        None
+      }
+
+    val int96RebaseMode = DataSourceUtils.int96RebaseMode(
+      footerFileMetaData.getKeyValueMetaData.get,
+      SQLConf.get.getConf(SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ))
+
+    val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
+    val hadoopAttemptContext =
+      new TaskAttemptContextImpl(sharedConf, attemptId)
+
+    // Try to push down filters when filter push-down is enabled.
+    // Notice: This push-down is RowGroups level, not individual records.
+    if (pushed.isDefined) {
+      
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
+    }
+    val taskContext = Option(TaskContext.get())
+    if (enableVectorizedReader) {
+      val vectorizedReader = new VectorizedParquetRecordReader(
+        convertTz.orNull,
+        datetimeRebaseMode.toString,
+        int96RebaseMode.toString,
+        enableOffHeapColumnVector && taskContext.isDefined,
+        capacity)
+      val iter = new RecordReaderIterator(vectorizedReader)
+      // SPARK-23457 Register a task completion listener before 
`initialization`.
+      taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+      vectorizedReader.initialize(split, hadoopAttemptContext)
+      vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+      if (returningBatch) {
+        vectorizedReader.enableReturningBatches()
+      }
+
+      // UnsafeRowParquetRecordReader appends the columns internally to avoid 
another copy.
+      iter.asInstanceOf[Iterator[InternalRow]]
+    } else {
+      // ParquetRecordReader returns InternalRow
+      val readSupport = new ParquetReadSupport(
+        convertTz,
+        enableVectorizedReader = false,
+        datetimeRebaseMode,
+        int96RebaseMode)
+      val reader = if (pushed.isDefined && enableRecordFilter) {
+        val parquetFilter = FilterCompat.get(pushed.get, null)
+        new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+      } else {
+        new ParquetRecordReader[InternalRow](readSupport)
+      }
+      val iter = new RecordReaderIterator[InternalRow](reader)
+      // SPARK-23457 Register a task completion listener before 
`initialization`.
+      taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+      reader.initialize(split, hadoopAttemptContext)
+
+      val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
+      val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
+
+      if (partitionSchema.length == 0) {
+        // There is no partition columns
+        iter.map(unsafeProjection)
+      } else {
+        val joinedRow = new JoinedRow()
+        iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+      }
+    }
+  }
+}
+
+object Spark31ParquetReader extends SparkParquetReaderBuilder {
+
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  def build(vectorized: Boolean,
+            sqlConf: SQLConf,
+            options: Map[String, String],
+            hadoopConf: Configuration): SparkParquetReader = {
+    //set hadoopconf
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+    hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, 
sqlConf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, 
sqlConf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, 
sqlConf.caseSensitiveAnalysis)
+    hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, 
sqlConf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, 
sqlConf.isParquetINT96AsTimestamp)
+    new Spark31ParquetReader(
+      enableVectorizedReader = vectorized,
+      enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+      pushDownDate = sqlConf.parquetFilterPushDownDate,
+      pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+      pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+      pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold,
+      pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith,
+      isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+      timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+      enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+      capacity = sqlConf.parquetVectorizedReaderBatchSize,
+      returningBatch = sqlConf.parquetVectorizedReaderEnabled,
+      enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+      timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
index 3a5812a5faa..ea486c7383b 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_2Adapter.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.adapter
 
 import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.Spark32HoodieFileScanRDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.avro._
@@ -30,10 +31,11 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
 import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark32LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark32LegacyHoodieParquetFileFormat, Spark32ParquetReader, SparkParquetReader}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, 
HoodieSpark3_2ExtendedSqlParser}
 import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, 
StructType}
 import org.apache.spark.sql.vectorized.ColumnarUtils
@@ -123,4 +125,20 @@ class Spark3_2Adapter extends BaseSpark3Adapter {
     case OFF_HEAP => "OFF_HEAP"
     case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: 
$level")
   }
+
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  override def createParquetFileReader(vectorized: Boolean,
+                                       sqlConf: SQLConf,
+                                       options: Map[String, String],
+                                       hadoopConf: Configuration): 
SparkParquetReader = {
+    Spark32ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
new file mode 100644
index 00000000000..5437a18cd4b
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark32ParquetReader.scala
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop._
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+import java.net.URI
+
+class Spark32ParquetReader(enableVectorizedReader: Boolean,
+                           datetimeRebaseModeInRead: String,
+                           int96RebaseModeInRead: String,
+                           enableParquetFilterPushDown: Boolean,
+                           pushDownDate: Boolean,
+                           pushDownTimestamp: Boolean,
+                           pushDownDecimal: Boolean,
+                           pushDownInFilterThreshold: Int,
+                           pushDownStringStartWith: Boolean,
+                           isCaseSensitive: Boolean,
+                           timestampConversion: Boolean,
+                           enableOffHeapColumnVector: Boolean,
+                           capacity: Int,
+                           returningBatch: Boolean,
+                           enableRecordFilter: Boolean,
+                           timeZoneId: Option[String]) extends 
SparkParquetReaderBase(
+  enableVectorizedReader = enableVectorizedReader,
+  enableParquetFilterPushDown = enableParquetFilterPushDown,
+  pushDownDate = pushDownDate,
+  pushDownTimestamp = pushDownTimestamp,
+  pushDownDecimal = pushDownDecimal,
+  pushDownInFilterThreshold = pushDownInFilterThreshold,
+  isCaseSensitive = isCaseSensitive,
+  timestampConversion = timestampConversion,
+  enableOffHeapColumnVector = enableOffHeapColumnVector,
+  capacity = capacity,
+  returningBatch = returningBatch,
+  enableRecordFilter = enableRecordFilter,
+  timeZoneId = timeZoneId) {
+
+  /**
+   * Read an individual parquet file
+   * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark 
v3.2.4 adapted here
+   *
+   * @param file            parquet file to read
+   * @param requiredSchema  desired output schema of the data
+   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
+   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf      the hadoop conf
+   * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
+   */
+  protected def doRead(file: PartitionedFile,
+                      requiredSchema: StructType,
+                      partitionSchema: StructType,
+                      filters: Seq[Filter],
+                      sharedConf: Configuration): Iterator[InternalRow] = {
+    assert(file.partitionValues.numFields == partitionSchema.size)
+
+    val filePath = new Path(new URI(file.filePath))
+    val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
+
+    lazy val footerFileMetaData =
+      ParquetFooterReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
+    val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+      footerFileMetaData.getKeyValueMetaData.get,
+      datetimeRebaseModeInRead)
+    // Try to push down filters when filter push-down is enabled.
+    val pushed = if (enableParquetFilterPushDown) {
+      val parquetSchema = footerFileMetaData.getSchema
+      val parquetFilters = new ParquetFilters(
+        parquetSchema,
+        pushDownDate,
+        pushDownTimestamp,
+        pushDownDecimal,
+        pushDownStringStartWith,
+        pushDownInFilterThreshold,
+        isCaseSensitive,
+        datetimeRebaseSpec)
+      filters
+        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
+        // is used here.
+        .flatMap(parquetFilters.createFilter(_))
+        .reduceOption(FilterApi.and)
+    } else {
+      None
+    }
+
+    // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions 
to int96 timestamps'
+    // *only* if the file was created by something other than "parquet-mr", so 
check the actual
+    // writer here for this file.  We have to do this per-file, as each file 
in the table may
+    // have different writers.
+    // Define isCreatedByParquetMr as function to avoid unnecessary parquet 
footer reads.
+    def isCreatedByParquetMr: Boolean =
+      footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+    val convertTz =
+      if (timestampConversion && !isCreatedByParquetMr) {
+        
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+      } else {
+        None
+      }
+
+    val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+      footerFileMetaData.getKeyValueMetaData.get,
+      int96RebaseModeInRead)
+
+    val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
+    val hadoopAttemptContext =
+      new TaskAttemptContextImpl(sharedConf, attemptId)
+
+    // Try to push down filters when filter push-down is enabled.
+    // Notice: This push-down is RowGroups level, not individual records.
+    if (pushed.isDefined) {
+      
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
+    }
+    val taskContext = Option(TaskContext.get())
+    if (enableVectorizedReader) {
+      val vectorizedReader = new VectorizedParquetRecordReader(
+        convertTz.orNull,
+        datetimeRebaseSpec.mode.toString,
+        datetimeRebaseSpec.timeZone,
+        int96RebaseSpec.mode.toString,
+        int96RebaseSpec.timeZone,
+        enableOffHeapColumnVector && taskContext.isDefined,
+        capacity)
+      // SPARK-37089: We cannot register a task completion listener to close 
this iterator here
+      // because downstream exec nodes have already registered their 
listeners. Since listeners
+      // are executed in reverse order of registration, a listener registered 
here would close the
+      // iterator while downstream exec nodes are still running. When off-heap 
column vectors are
+      // enabled, this can cause a use-after-free bug leading to a segfault.
+      //
+      // Instead, we use FileScanRDD's task completion listener to close this 
iterator.
+      val iter = new RecordReaderIterator(vectorizedReader)
+      try {
+        vectorizedReader.initialize(split, hadoopAttemptContext)
+        vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+        if (returningBatch) {
+          vectorizedReader.enableReturningBatches()
+        }
+
+        // UnsafeRowParquetRecordReader appends the columns internally to 
avoid another copy.
+        iter.asInstanceOf[Iterator[InternalRow]]
+      } catch {
+        case e: Throwable =>
+          // SPARK-23457: In case there is an exception in initialization, 
close the iterator to
+          // avoid leaking resources.
+          iter.close()
+          throw e
+      }
+    } else {
+      // ParquetRecordReader returns InternalRow
+      val readSupport = new ParquetReadSupport(
+        convertTz,
+        enableVectorizedReader = false,
+        datetimeRebaseSpec,
+        int96RebaseSpec)
+      val reader = if (pushed.isDefined && enableRecordFilter) {
+        val parquetFilter = FilterCompat.get(pushed.get, null)
+        new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+      } else {
+        new ParquetRecordReader[InternalRow](readSupport)
+      }
+      val iter = new RecordReaderIterator[InternalRow](reader)
+      try {
+        reader.initialize(split, hadoopAttemptContext)
+
+        val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
+        val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
+
+        if (partitionSchema.length == 0) {
+          // There is no partition columns
+          iter.map(unsafeProjection)
+        } else {
+          val joinedRow = new JoinedRow()
+          iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+        }
+      } catch {
+        case e: Throwable =>
+          // SPARK-23457: In case there is an exception in initialization, 
close the iterator to
+          // avoid leaking resources.
+          iter.close()
+          throw e
+      }
+    }
+  }
+}
+
+object Spark32ParquetReader extends SparkParquetReaderBuilder {
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  def build(vectorized: Boolean,
+            sqlConf: SQLConf,
+            options: Map[String, String],
+            hadoopConf: Configuration): SparkParquetReader = {
+    //set hadoopconf
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+    hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, 
sqlConf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, 
sqlConf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, 
sqlConf.caseSensitiveAnalysis)
+    hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, 
sqlConf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, 
sqlConf.isParquetINT96AsTimestamp)
+    // Using string value of this conf to preserve compatibility across spark 
versions. See [HUDI-5868]
+    hadoopConf.setBoolean(
+      "spark.sql.legacy.parquet.nanosAsLong",
+      sqlConf.getConfString("spark.sql.legacy.parquet.nanosAsLong", 
"false").toBoolean
+    )
+
+    val parquetOptions = new ParquetOptions(options, sqlConf)
+    new Spark32ParquetReader(
+      enableVectorizedReader = vectorized,
+      datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead,
+      int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead,
+      enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+      pushDownDate = sqlConf.parquetFilterPushDownDate,
+      pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+      pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+      pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold,
+      pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith,
+      isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+      timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+      enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+      capacity = sqlConf.parquetVectorizedReaderBatchSize,
+      returningBatch = sqlConf.parquetVectorizedReaderEnabled,
+      enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+      timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index e3d2cc9cd18..c11c404c33a 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.adapter
 
 import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.Spark33HoodieFileScanRDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.avro._
@@ -30,10 +31,11 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
 import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark33LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark33LegacyHoodieParquetFileFormat, Spark33ParquetReader, SparkParquetReader}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, 
HoodieSpark3_3ExtendedSqlParser}
 import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, 
StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatchRow
@@ -124,4 +126,20 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
     case OFF_HEAP => "OFF_HEAP"
     case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: 
$level")
   }
+
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  override def createParquetFileReader(vectorized: Boolean,
+                                       sqlConf: SQLConf,
+                                       options: Map[String, String],
+                                       hadoopConf: Configuration): 
SparkParquetReader = {
+    Spark33ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
new file mode 100644
index 00000000000..0bd8cca3599
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33ParquetReader.scala
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop._
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+import java.net.URI
+
+class Spark33ParquetReader(enableVectorizedReader: Boolean,
+                           datetimeRebaseModeInRead: String,
+                           int96RebaseModeInRead: String,
+                           enableParquetFilterPushDown: Boolean,
+                           pushDownDate: Boolean,
+                           pushDownTimestamp: Boolean,
+                           pushDownDecimal: Boolean,
+                           pushDownInFilterThreshold: Int,
+                           pushDownStringStartWith: Boolean,
+                           isCaseSensitive: Boolean,
+                           timestampConversion: Boolean,
+                           enableOffHeapColumnVector: Boolean,
+                           capacity: Int,
+                           returningBatch: Boolean,
+                           enableRecordFilter: Boolean,
+                           timeZoneId: Option[String]) extends 
SparkParquetReaderBase(
+  enableVectorizedReader = enableVectorizedReader,
+  enableParquetFilterPushDown = enableParquetFilterPushDown,
+  pushDownDate = pushDownDate,
+  pushDownTimestamp = pushDownTimestamp,
+  pushDownDecimal = pushDownDecimal,
+  pushDownInFilterThreshold = pushDownInFilterThreshold,
+  isCaseSensitive = isCaseSensitive,
+  timestampConversion = timestampConversion,
+  enableOffHeapColumnVector = enableOffHeapColumnVector,
+  capacity = capacity,
+  returningBatch = returningBatch,
+  enableRecordFilter = enableRecordFilter,
+  timeZoneId = timeZoneId) {
+
+  /**
+   * Read an individual parquet file
+   * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark 
v3.3.4 adapted here
+   *
+   * @param file            parquet file to read
+   * @param requiredSchema  desired output schema of the data
+   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
+   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf      the hadoop conf
+   * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
+   */
+  protected def doRead(file: PartitionedFile,
+                      requiredSchema: StructType,
+                      partitionSchema: StructType,
+                      filters: Seq[Filter],
+                      sharedConf: Configuration): Iterator[InternalRow] = {
+    assert(file.partitionValues.numFields == partitionSchema.size)
+
+    val filePath = new Path(new URI(file.filePath))
+    val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
+
+
+    lazy val footerFileMetaData =
+      ParquetFooterReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
+    val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+      footerFileMetaData.getKeyValueMetaData.get,
+      datetimeRebaseModeInRead)
+    // Try to push down filters when filter push-down is enabled.
+    val pushed = if (enableParquetFilterPushDown) {
+      val parquetSchema = footerFileMetaData.getSchema
+      val parquetFilters = new ParquetFilters(
+        parquetSchema,
+        pushDownDate,
+        pushDownTimestamp,
+        pushDownDecimal,
+        pushDownStringStartWith,
+        pushDownInFilterThreshold,
+        isCaseSensitive,
+        datetimeRebaseSpec)
+      filters
+        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
+        // is used here.
+        .flatMap(parquetFilters.createFilter(_))
+        .reduceOption(FilterApi.and)
+    } else {
+      None
+    }
+
+    // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions 
to int96 timestamps'
+    // *only* if the file was created by something other than "parquet-mr", so 
check the actual
+    // writer here for this file.  We have to do this per-file, as each file 
in the table may
+    // have different writers.
+    // Define isCreatedByParquetMr as function to avoid unnecessary parquet 
footer reads.
+    def isCreatedByParquetMr: Boolean =
+      footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+    val convertTz =
+      if (timestampConversion && !isCreatedByParquetMr) {
+        
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+      } else {
+        None
+      }
+
+    val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+      footerFileMetaData.getKeyValueMetaData.get,
+      int96RebaseModeInRead)
+
+    val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
+    val hadoopAttemptContext =
+      new TaskAttemptContextImpl(sharedConf, attemptId)
+
+    // Try to push down filters when filter push-down is enabled.
+    // Notice: This push-down is RowGroups level, not individual records.
+    if (pushed.isDefined) {
+      
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
+    }
+    val taskContext = Option(TaskContext.get())
+    if (enableVectorizedReader) {
+      val vectorizedReader = new VectorizedParquetRecordReader(
+        convertTz.orNull,
+        datetimeRebaseSpec.mode.toString,
+        datetimeRebaseSpec.timeZone,
+        int96RebaseSpec.mode.toString,
+        int96RebaseSpec.timeZone,
+        enableOffHeapColumnVector && taskContext.isDefined,
+        capacity)
+      // SPARK-37089: We cannot register a task completion listener to close 
this iterator here
+      // because downstream exec nodes have already registered their 
listeners. Since listeners
+      // are executed in reverse order of registration, a listener registered 
here would close the
+      // iterator while downstream exec nodes are still running. When off-heap 
column vectors are
+      // enabled, this can cause a use-after-free bug leading to a segfault.
+      //
+      // Instead, we use FileScanRDD's task completion listener to close this 
iterator.
+      val iter = new RecordReaderIterator(vectorizedReader)
+      try {
+        vectorizedReader.initialize(split, hadoopAttemptContext)
+        vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+        if (returningBatch) {
+          vectorizedReader.enableReturningBatches()
+        }
+
+        // UnsafeRowParquetRecordReader appends the columns internally to 
avoid another copy.
+        iter.asInstanceOf[Iterator[InternalRow]]
+      } catch {
+        case e: Throwable =>
+          // SPARK-23457: In case there is an exception in initialization, 
close the iterator to
+          // avoid leaking resources.
+          iter.close()
+          throw e
+      }
+    } else {
+      // ParquetRecordReader returns InternalRow
+      val readSupport = new ParquetReadSupport(
+        convertTz,
+        enableVectorizedReader = false,
+        datetimeRebaseSpec,
+        int96RebaseSpec)
+      val reader = if (pushed.isDefined && enableRecordFilter) {
+        val parquetFilter = FilterCompat.get(pushed.get, null)
+        new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+      } else {
+        new ParquetRecordReader[InternalRow](readSupport)
+      }
+      val iter = new RecordReaderIterator[InternalRow](reader)
+      try {
+        reader.initialize(split, hadoopAttemptContext)
+
+        val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
+        val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
+
+        if (partitionSchema.length == 0) {
+          // There is no partition columns
+          iter.map(unsafeProjection)
+        } else {
+          val joinedRow = new JoinedRow()
+          iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+        }
+      } catch {
+        case e: Throwable =>
+          // SPARK-23457: In case there is an exception in initialization, 
close the iterator to
+          // avoid leaking resources.
+          iter.close()
+          throw e
+      }
+    }
+  }
+}
+
+object Spark33ParquetReader extends SparkParquetReaderBuilder {
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  def build(vectorized: Boolean,
+            sqlConf: SQLConf,
+            options: Map[String, String],
+            hadoopConf: Configuration): SparkParquetReader = {
+    //set hadoopconf
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+    hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, 
sqlConf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, 
sqlConf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, 
sqlConf.caseSensitiveAnalysis)
+    hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, 
sqlConf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, 
sqlConf.isParquetINT96AsTimestamp)
+    // Using string value of this conf to preserve compatibility across spark 
versions. See [HUDI-5868]
+    hadoopConf.setBoolean(
+      "spark.sql.legacy.parquet.nanosAsLong",
+      sqlConf.getConfString("spark.sql.legacy.parquet.nanosAsLong", 
"false").toBoolean
+    )
+
+    val parquetOptions = new ParquetOptions(options, sqlConf)
+    new Spark33ParquetReader(enableVectorizedReader = vectorized,
+      datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead,
+      int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead,
+      enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+      pushDownDate = sqlConf.parquetFilterPushDownDate,
+      pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+      pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+      pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold,
+      pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith,
+      isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+      timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+      enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+      capacity = sqlConf.parquetVectorizedReaderBatchSize,
+      returningBatch = sqlConf.parquetVectorizedReaderEnabled,
+      enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+      timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index 0ae5ef3dbf3..1e2807df55b 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.adapter
 
 import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.Spark34HoodieFileScanRDD
 import org.apache.spark.sql.avro._
 import org.apache.spark.sql.catalyst.InternalRow
@@ -29,7 +30,7 @@ import 
org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
 import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark34LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark34LegacyHoodieParquetFileFormat, Spark34ParquetReader, SparkParquetReader}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
@@ -37,6 +38,7 @@ import 
org.apache.spark.sql.parser.{HoodieExtendedParserInterface, HoodieSpark3_
 import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, 
StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatchRow
 import org.apache.spark.sql._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel._
 
@@ -124,4 +126,20 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
     case OFF_HEAP => "OFF_HEAP"
     case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: 
$level")
   }
+
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  override def createParquetFileReader(vectorized: Boolean,
+                                       sqlConf: SQLConf,
+                                       options: Map[String, String],
+                                       hadoopConf: Configuration): 
SparkParquetReader = {
+    Spark34ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
new file mode 100644
index 00000000000..73db889a044
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34ParquetReader.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop._
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+class Spark34ParquetReader(enableVectorizedReader: Boolean,
+                           datetimeRebaseModeInRead: String,
+                           int96RebaseModeInRead: String,
+                           enableParquetFilterPushDown: Boolean,
+                           pushDownDate: Boolean,
+                           pushDownTimestamp: Boolean,
+                           pushDownDecimal: Boolean,
+                           pushDownInFilterThreshold: Int,
+                           pushDownStringPredicate: Boolean,
+                           isCaseSensitive: Boolean,
+                           timestampConversion: Boolean,
+                           enableOffHeapColumnVector: Boolean,
+                           capacity: Int,
+                           returningBatch: Boolean,
+                           enableRecordFilter: Boolean,
+                           timeZoneId: Option[String]) extends 
SparkParquetReaderBase(
+  enableVectorizedReader = enableVectorizedReader,
+  enableParquetFilterPushDown = enableParquetFilterPushDown,
+  pushDownDate = pushDownDate,
+  pushDownTimestamp = pushDownTimestamp,
+  pushDownDecimal = pushDownDecimal,
+  pushDownInFilterThreshold = pushDownInFilterThreshold,
+  isCaseSensitive = isCaseSensitive,
+  timestampConversion = timestampConversion,
+  enableOffHeapColumnVector = enableOffHeapColumnVector,
+  capacity = capacity,
+  returningBatch = returningBatch,
+  enableRecordFilter = enableRecordFilter,
+  timeZoneId = timeZoneId) {
+
+  /**
+   * Read an individual parquet file
+   * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark 
v3.4.2 adapted here
+   *
+   * @param file            parquet file to read
+   * @param requiredSchema  desired output schema of the data
+   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
+   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf      the hadoop conf
+   * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
+   */
+  protected def doRead(file: PartitionedFile,
+                      requiredSchema: StructType,
+                      partitionSchema: StructType,
+                      filters: Seq[Filter],
+                      sharedConf: Configuration): Iterator[InternalRow] = {
+    assert(file.partitionValues.numFields == partitionSchema.size)
+
+    val filePath = file.toPath
+    val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
+
+
+    lazy val footerFileMetaData =
+      ParquetFooterReader.readFooter(sharedConf, filePath, 
SKIP_ROW_GROUPS).getFileMetaData
+    val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+      footerFileMetaData.getKeyValueMetaData.get,
+      datetimeRebaseModeInRead)
+    // Try to push down filters when filter push-down is enabled.
+    val pushed = if (enableParquetFilterPushDown) {
+      val parquetSchema = footerFileMetaData.getSchema
+      val parquetFilters = new ParquetFilters(
+        parquetSchema,
+        pushDownDate,
+        pushDownTimestamp,
+        pushDownDecimal,
+        pushDownStringPredicate,
+        pushDownInFilterThreshold,
+        isCaseSensitive,
+        datetimeRebaseSpec)
+      filters
+        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
+        // is used here.
+        .flatMap(parquetFilters.createFilter(_))
+        .reduceOption(FilterApi.and)
+    } else {
+      None
+    }
+
+    // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions 
to int96 timestamps'
+    // *only* if the file was created by something other than "parquet-mr", so 
check the actual
+    // writer here for this file.  We have to do this per-file, as each file 
in the table may
+    // have different writers.
+    // Define isCreatedByParquetMr as function to avoid unnecessary parquet 
footer reads.
+    def isCreatedByParquetMr: Boolean =
+      footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+    val convertTz =
+      if (timestampConversion && !isCreatedByParquetMr) {
+        
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+      } else {
+        None
+      }
+
+    val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+      footerFileMetaData.getKeyValueMetaData.get,
+      int96RebaseModeInRead)
+
+    val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
+    val hadoopAttemptContext =
+      new TaskAttemptContextImpl(sharedConf, attemptId)
+
+    // Try to push down filters when filter push-down is enabled.
+    // Notice: This push-down is RowGroups level, not individual records.
+    if (pushed.isDefined) {
+      
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
+    }
+    val taskContext = Option(TaskContext.get())
+    if (enableVectorizedReader) {
+      val vectorizedReader = new VectorizedParquetRecordReader(
+        convertTz.orNull,
+        datetimeRebaseSpec.mode.toString,
+        datetimeRebaseSpec.timeZone,
+        int96RebaseSpec.mode.toString,
+        int96RebaseSpec.timeZone,
+        enableOffHeapColumnVector && taskContext.isDefined,
+        capacity)
+      // SPARK-37089: We cannot register a task completion listener to close 
this iterator here
+      // because downstream exec nodes have already registered their 
listeners. Since listeners
+      // are executed in reverse order of registration, a listener registered 
here would close the
+      // iterator while downstream exec nodes are still running. When off-heap 
column vectors are
+      // enabled, this can cause a use-after-free bug leading to a segfault.
+      //
+      // Instead, we use FileScanRDD's task completion listener to close this 
iterator.
+      val iter = new RecordReaderIterator(vectorizedReader)
+      try {
+        vectorizedReader.initialize(split, hadoopAttemptContext)
+        vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+        if (returningBatch) {
+          vectorizedReader.enableReturningBatches()
+        }
+
+        // UnsafeRowParquetRecordReader appends the columns internally to 
avoid another copy.
+        iter.asInstanceOf[Iterator[InternalRow]]
+      } catch {
+        case e: Throwable =>
+          // SPARK-23457: In case there is an exception in initialization, 
close the iterator to
+          // avoid leaking resources.
+          iter.close()
+          throw e
+      }
+    } else {
+      // ParquetRecordReader returns InternalRow
+      val readSupport = new ParquetReadSupport(
+        convertTz,
+        enableVectorizedReader = false,
+        datetimeRebaseSpec,
+        int96RebaseSpec)
+      val reader = if (pushed.isDefined && enableRecordFilter) {
+        val parquetFilter = FilterCompat.get(pushed.get, null)
+        new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+      } else {
+        new ParquetRecordReader[InternalRow](readSupport)
+      }
+      val readerWithRowIndexes = 
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
+        requiredSchema)
+      val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes)
+      try {
+        readerWithRowIndexes.initialize(split, hadoopAttemptContext)
+
+        val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
+        val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
+
+        if (partitionSchema.length == 0) {
+          // There is no partition columns
+          iter.map(unsafeProjection)
+        } else {
+          val joinedRow = new JoinedRow()
+          iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+        }
+      } catch {
+        case e: Throwable =>
+          // SPARK-23457: In case there is an exception in initialization, 
close the iterator to
+          // avoid leaking resources.
+          iter.close()
+          throw e
+      }
+    }
+  }
+}
+
+object Spark34ParquetReader extends SparkParquetReaderBuilder {
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  def build(vectorized: Boolean,
+            sqlConf: SQLConf,
+            options: Map[String, String],
+            hadoopConf: Configuration): SparkParquetReader = {
+    //set hadoopconf
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+    hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, 
sqlConf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, 
sqlConf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, 
sqlConf.caseSensitiveAnalysis)
+    hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, 
sqlConf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, 
sqlConf.isParquetINT96AsTimestamp)
+    // Using string value of this conf to preserve compatibility across spark 
versions. See [HUDI-5868]
+    hadoopConf.setBoolean(
+      SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+      sqlConf.getConfString(
+        SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+        SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValueString).toBoolean
+    )
+    hadoopConf.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, 
sqlConf.parquetInferTimestampNTZEnabled)
+
+    val returningBatch = sqlConf.parquetVectorizedReaderEnabled &&
+      options.getOrElse(FileFormat.OPTION_RETURNING_BATCH,
+        throw new IllegalArgumentException(
+          "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. 
" +
+            "To workaround this issue, set 
spark.sql.parquet.enableVectorizedReader=false."))
+        .equals("true")
+
+    val parquetOptions = new ParquetOptions(options, sqlConf)
+    new Spark34ParquetReader(
+      enableVectorizedReader = vectorized,
+      datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead,
+      int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead,
+      enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+      pushDownDate = sqlConf.parquetFilterPushDownDate,
+      pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+      pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+      pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold,
+      pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate,
+      isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+      timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+      enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+      capacity = sqlConf.parquetVectorizedReaderBatchSize,
+      returningBatch = returningBatch,
+      enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+      timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
index d18291a1809..a3e0c19621b 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.adapter
 
 import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.Spark35HoodieFileScanRDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.avro._
@@ -31,9 +32,10 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
 import org.apache.spark.sql.connector.catalog.V2TableWithV1Fallback
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark35LegacyHoodieParquetFileFormat}
+import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark35ParquetReader, Spark35LegacyHoodieParquetFileFormat, SparkParquetReader}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.parser.{HoodieExtendedParserInterface, 
HoodieSpark3_5ExtendedSqlParser}
 import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, 
StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatchRow
@@ -124,4 +126,20 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
     case OFF_HEAP => "OFF_HEAP"
     case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: 
$level")
   }
+
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  override def createParquetFileReader(vectorized: Boolean,
+                                       sqlConf: SQLConf,
+                                       options: Map[String, String],
+                                       hadoopConf: Configuration): 
SparkParquetReader = {
+    Spark35ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
new file mode 100644
index 00000000000..f088efd07e1
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35ParquetReader.scala
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.FileSplit
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
FileFormat, PartitionedFile, RecordReaderIterator}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StructType
+
+class Spark35ParquetReader(enableVectorizedReader: Boolean,
+                           datetimeRebaseModeInRead: String,
+                           int96RebaseModeInRead: String,
+                           enableParquetFilterPushDown: Boolean,
+                           pushDownDate: Boolean,
+                           pushDownTimestamp: Boolean,
+                           pushDownDecimal: Boolean,
+                           pushDownInFilterThreshold: Int,
+                           pushDownStringPredicate: Boolean,
+                           isCaseSensitive: Boolean,
+                           timestampConversion: Boolean,
+                           enableOffHeapColumnVector: Boolean,
+                           capacity: Int,
+                           returningBatch: Boolean,
+                           enableRecordFilter: Boolean,
+                           timeZoneId: Option[String]) extends 
SparkParquetReaderBase(
+  enableVectorizedReader = enableVectorizedReader,
+  enableParquetFilterPushDown = enableParquetFilterPushDown,
+  pushDownDate = pushDownDate,
+  pushDownTimestamp = pushDownTimestamp,
+  pushDownDecimal = pushDownDecimal,
+  pushDownInFilterThreshold = pushDownInFilterThreshold,
+  isCaseSensitive = isCaseSensitive,
+  timestampConversion = timestampConversion,
+  enableOffHeapColumnVector = enableOffHeapColumnVector,
+  capacity = capacity,
+  returningBatch = returningBatch,
+  enableRecordFilter = enableRecordFilter,
+  timeZoneId = timeZoneId) {
+
+  /**
+   * Read an individual parquet file
+   * Code from ParquetFileFormat#buildReaderWithPartitionValues from Spark 
v3.5.1 adapted here
+   *
+   * @param file            parquet file to read
+   * @param requiredSchema  desired output schema of the data
+   * @param partitionSchema schema of the partition columns. Partition values 
will be appended to the end of every row
+   * @param filters         filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param sharedConf      the hadoop conf
+   * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
+   */
+  protected def doRead(file: PartitionedFile,
+                      requiredSchema: StructType,
+                      partitionSchema: StructType,
+                      filters: Seq[Filter],
+                      sharedConf: Configuration): Iterator[InternalRow] = {
+    assert(file.partitionValues.numFields == partitionSchema.size)
+
+    val filePath = file.toPath
+    val split = new FileSplit(filePath, file.start, file.length, 
Array.empty[String])
+
+    val fileFooter = if (enableVectorizedReader) {
+      // When there are vectorized reads, we can avoid reading the footer 
twice by reading
+      // all row groups in advance and filter row groups according to filters 
that require
+      // push down (no need to read the footer metadata again).
+      ParquetFooterReader.readFooter(sharedConf, file, 
ParquetFooterReader.WITH_ROW_GROUPS)
+    } else {
+      ParquetFooterReader.readFooter(sharedConf, file, 
ParquetFooterReader.SKIP_ROW_GROUPS)
+    }
+
+    val footerFileMetaData = fileFooter.getFileMetaData
+    val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+      footerFileMetaData.getKeyValueMetaData.get,
+      datetimeRebaseModeInRead)
+    val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+      footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead)
+
+    // Try to push down filters when filter push-down is enabled.
+    val pushed = if (enableParquetFilterPushDown) {
+      val parquetSchema = footerFileMetaData.getSchema
+      val parquetFilters = new ParquetFilters(
+        parquetSchema,
+        pushDownDate,
+        pushDownTimestamp,
+        pushDownDecimal,
+        pushDownStringPredicate,
+        pushDownInFilterThreshold,
+        isCaseSensitive,
+        datetimeRebaseSpec)
+      filters
+        // Collects all converted Parquet filter predicates. Notice that not 
all predicates can be
+        // converted (`ParquetFilters.createFilter` returns an `Option`). 
That's why a `flatMap`
+        // is used here.
+        .flatMap(parquetFilters.createFilter(_))
+        .reduceOption(FilterApi.and)
+    } else {
+      None
+    }
+
+    // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions 
to int96 timestamps'
+    // *only* if the file was created by something other than "parquet-mr", so 
check the actual
+    // writer here for this file.  We have to do this per-file, as each file 
in the table may
+    // have different writers.
+    // Define isCreatedByParquetMr as function to avoid unnecessary parquet 
footer reads.
+    def isCreatedByParquetMr: Boolean =
+      footerFileMetaData.getCreatedBy().startsWith("parquet-mr")
+
+    val convertTz =
+      if (timestampConversion && !isCreatedByParquetMr) {
+        
Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))
+      } else {
+        None
+      }
+
+
+    val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 
0), 0)
+    val hadoopAttemptContext =
+      new TaskAttemptContextImpl(sharedConf, attemptId)
+
+    // Try to push down filters when filter push-down is enabled.
+    // Notice: This push-down is RowGroups level, not individual records.
+    if (pushed.isDefined) {
+      
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, 
pushed.get)
+    }
+    val taskContext = Option(TaskContext.get())
+    if (enableVectorizedReader) {
+      val vectorizedReader = new VectorizedParquetRecordReader(
+        convertTz.orNull,
+        datetimeRebaseSpec.mode.toString,
+        datetimeRebaseSpec.timeZone,
+        int96RebaseSpec.mode.toString,
+        int96RebaseSpec.timeZone,
+        enableOffHeapColumnVector && taskContext.isDefined,
+        capacity)
+      // SPARK-37089: We cannot register a task completion listener to close 
this iterator here
+      // because downstream exec nodes have already registered their 
listeners. Since listeners
+      // are executed in reverse order of registration, a listener registered 
here would close the
+      // iterator while downstream exec nodes are still running. When off-heap 
column vectors are
+      // enabled, this can cause a use-after-free bug leading to a segfault.
+      //
+      // Instead, we use FileScanRDD's task completion listener to close this 
iterator.
+      val iter = new RecordReaderIterator(vectorizedReader)
+      try {
+        vectorizedReader.initialize(split, hadoopAttemptContext, 
Option.apply(fileFooter))
+        vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+        if (returningBatch) {
+          vectorizedReader.enableReturningBatches()
+        }
+
+        // UnsafeRowParquetRecordReader appends the columns internally to 
avoid another copy.
+        iter.asInstanceOf[Iterator[InternalRow]]
+      } catch {
+        case e: Throwable =>
+          // SPARK-23457: In case there is an exception in initialization, 
close the iterator to
+          // avoid leaking resources.
+          iter.close()
+          throw e
+      }
+    } else {
+      // ParquetRecordReader returns InternalRow
+      val readSupport = new ParquetReadSupport(
+        convertTz,
+        enableVectorizedReader = false,
+        datetimeRebaseSpec,
+        int96RebaseSpec)
+      val reader = if (pushed.isDefined && enableRecordFilter) {
+        val parquetFilter = FilterCompat.get(pushed.get, null)
+        new ParquetRecordReader[InternalRow](readSupport, parquetFilter)
+      } else {
+        new ParquetRecordReader[InternalRow](readSupport)
+      }
+      val readerWithRowIndexes = 
ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader,
+        requiredSchema)
+      val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes)
+      try {
+        readerWithRowIndexes.initialize(split, hadoopAttemptContext)
+
+        val fullSchema = toAttributes(requiredSchema) ++ 
toAttributes(partitionSchema)
+        val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
+
+        if (partitionSchema.length == 0) {
+          // There is no partition columns
+          iter.map(unsafeProjection)
+        } else {
+          val joinedRow = new JoinedRow()
+          iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues)))
+        }
+      } catch {
+        case e: Throwable =>
+          // SPARK-23457: In case there is an exception in initialization, 
close the iterator to
+          // avoid leaking resources.
+          iter.close()
+          throw e
+      }
+    }
+  }
+}
+
+object Spark35ParquetReader extends SparkParquetReaderBuilder {
+  /**
+   * Get parquet file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return parquet file reader
+   */
+  def build(vectorized: Boolean,
+            sqlConf: SQLConf,
+            options: Map[String, String],
+            hadoopConf: Configuration): SparkParquetReader = {
+    //set hadoopconf
+    hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
+    hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, 
sqlConf.sessionLocalTimeZone)
+    hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, 
sqlConf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key, 
sqlConf.caseSensitiveAnalysis)
+    hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, 
sqlConf.isParquetBinaryAsString)
+    hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, 
sqlConf.isParquetINT96AsTimestamp)
+    // Using string value of this conf to preserve compatibility across spark 
versions. See [HUDI-5868]
+    hadoopConf.setBoolean(
+      SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+      sqlConf.getConfString(
+        SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+        SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValueString).toBoolean
+    )
+    hadoopConf.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, 
sqlConf.parquetInferTimestampNTZEnabled)
+
+    val returningBatch = sqlConf.parquetVectorizedReaderEnabled &&
+      options.getOrElse(FileFormat.OPTION_RETURNING_BATCH,
+        throw new IllegalArgumentException(
+          "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. 
" +
+            "To workaround this issue, set 
spark.sql.parquet.enableVectorizedReader=false."))
+        .equals("true")
+
+    val parquetOptions = new ParquetOptions(options, sqlConf)
+    new Spark35ParquetReader(
+      enableVectorizedReader = vectorized,
+      datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead,
+      int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead,
+      enableParquetFilterPushDown = sqlConf.parquetFilterPushDown,
+      pushDownDate = sqlConf.parquetFilterPushDownDate,
+      pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp,
+      pushDownDecimal = sqlConf.parquetFilterPushDownDecimal,
+      pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold,
+      pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate,
+      isCaseSensitive = sqlConf.caseSensitiveAnalysis,
+      timestampConversion = sqlConf.isParquetINT96TimestampConversion,
+      enableOffHeapColumnVector = sqlConf.offHeapColumnVectorEnabled,
+      capacity = sqlConf.parquetVectorizedReaderBatchSize,
+      returningBatch = returningBatch,
+      enableRecordFilter = sqlConf.parquetRecordFilterEnabled,
+      timeZoneId = Some(sqlConf.sessionLocalTimeZone))
+  }
+}

Reply via email to