This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d7cc87d687 [HUDI-7840] Add position merging to the new file group 
reader (#11413)
0d7cc87d687 is described below

commit 0d7cc87d687bd235bac099e481535cb9f223b501
Author: Jon Vexler <jbvex...@gmail.com>
AuthorDate: Fri Jun 7 19:47:42 2024 -0400

    [HUDI-7840] Add position merging to the new file group reader (#11413)
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Sagar Sumit <sagarsumi...@gmail.com>
---
 .../SparkFileFormatInternalRowReaderContext.scala  | 202 +++++++++++++++----
 .../hudi/common/engine/HoodieReaderContext.java    |  22 +++
 .../common/table/read/HoodieFileGroupReader.java   |   6 +-
 .../HoodiePositionBasedFileGroupRecordBuffer.java  |   4 +-
 .../read/HoodiePositionBasedSchemaHandler.java     |  75 ++++++++
 ...odieFileGroupReaderBasedParquetFileFormat.scala |   2 +-
 ...stSparkFileFormatInternalRowReaderContext.scala |  72 +++++++
 ...stHoodiePositionBasedFileGroupRecordBuffer.java | 214 +++++++++++++++++++++
 .../functional/TestFiltersInFileGroupReader.java   | 109 +++++++++++
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |   2 +-
 .../TestSpark35RecordPositionMetadataColumn.scala  | 143 ++++++++++++++
 11 files changed, 812 insertions(+), 39 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 640f1219fbf..715e2d9a9ab 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -22,10 +22,14 @@ package org.apache.hudi
 import org.apache.avro.Schema
 import org.apache.avro.generic.IndexedRecord
 import org.apache.hadoop.conf.Configuration
+import 
org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap,
 getAppliedRequiredSchema}
+import org.apache.hudi.avro.AvroSchemaUtils
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieRecord
+import 
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
 import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.common.util.collection.{ClosableIterator, 
CloseableMappingIterator}
+import org.apache.hudi.common.util.collection.{CachingIterator, 
ClosableIterator, CloseableMappingIterator}
 import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, 
HoodieSparkParquetReader}
 import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, 
StoragePath}
 import org.apache.hudi.util.CloseableInternalRowIterator
@@ -37,7 +41,7 @@ import 
org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
SparkParquetReader}
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{LongType, MetadataBuilder, StructField, 
StructType}
 import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
 
 import scala.collection.mutable
@@ -53,12 +57,20 @@ import scala.collection.mutable
  *                          not required for reading a file group with only 
log files.
  * @param recordKeyColumn   column name for the recordkey
  * @param filters           spark filters that might be pushed down into the 
reader
+ * @param requiredFilters   filters that are required and should always be 
used, even in merging situations
  */
 class SparkFileFormatInternalRowReaderContext(parquetFileReader: 
SparkParquetReader,
                                               recordKeyColumn: String,
-                                              filters: Seq[Filter]) extends 
BaseSparkInternalRowReaderContext {
+                                              filters: Seq[Filter],
+                                              requiredFilters: Seq[Filter]) 
extends BaseSparkInternalRowReaderContext {
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
+  private lazy val bootstrapSafeFilters: Seq[Filter] = 
filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
   private val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] = 
mutable.Map()
+  private lazy val allFilters = filters ++ requiredFilters
+
+  override def supportsParquetRowIndex: Boolean = {
+    HoodieSparkUtils.gteqSpark3_5
+  }
 
   override def getFileRecordIterator(filePath: StoragePath,
                                      start: Long,
@@ -66,6 +78,10 @@ class 
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
                                      dataSchema: Schema,
                                      requiredSchema: Schema,
                                      storage: HoodieStorage): 
ClosableIterator[InternalRow] = {
+    val hasRowIndexField = 
AvroSchemaUtils.containsFieldInSchema(requiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME)
+    if (hasRowIndexField) {
+      assert(supportsParquetRowIndex())
+    }
     val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
     if (FSUtils.isLogFile(filePath)) {
       val projection = 
HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType)
@@ -84,8 +100,20 @@ class 
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
       // each row if they are given. That is the only usage of the partition 
values in the reader.
       val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
         .createPartitionedFile(InternalRow.empty, filePath, start, length)
+      val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType, 
hasRowIndexField)
       new CloseableInternalRowIterator(parquetFileReader.read(fileInfo,
-        structType, StructType(Seq.empty), Seq.empty, 
storage.getConf.asInstanceOf[StorageConfiguration[Configuration]]))
+        readSchema, StructType(Seq.empty), readFilters, 
storage.getConf.asInstanceOf[StorageConfiguration[Configuration]]))
+    }
+  }
+
+  private def getSchemaAndFiltersForRead(structType: StructType, 
hasRowIndexField: Boolean): (StructType, Seq[Filter]) = {
+    val schemaForRead = getAppliedRequiredSchema(structType, hasRowIndexField)
+    if (!getHasLogFiles && !getNeedsBootstrapMerge) {
+      (schemaForRead, allFilters)
+    } else if (!getHasLogFiles && hasRowIndexField) {
+      (schemaForRead, bootstrapSafeFilters)
+    } else {
+      (schemaForRead, requiredFilters)
     }
   }
 
@@ -116,45 +144,153 @@ class 
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
                                      skeletonRequiredSchema: Schema,
                                      dataFileIterator: 
ClosableIterator[InternalRow],
                                      dataRequiredSchema: Schema): 
ClosableIterator[InternalRow] = {
-    doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]],
-      dataFileIterator.asInstanceOf[ClosableIterator[Any]])
+    doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], 
skeletonRequiredSchema,
+      dataFileIterator.asInstanceOf[ClosableIterator[Any]], dataRequiredSchema)
   }
 
-  protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any], 
dataFileIterator: ClosableIterator[Any]): ClosableIterator[InternalRow] = {
-    new ClosableIterator[Any] {
-      val combinedRow = new JoinedRow()
+  private def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any],
+                               skeletonRequiredSchema: Schema,
+                               dataFileIterator: ClosableIterator[Any],
+                               dataRequiredSchema: Schema): 
ClosableIterator[InternalRow] = {
+    if (supportsParquetRowIndex()) {
+      assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME))
+      assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME))
+      val rowIndexColumn = new java.util.HashSet[String]()
+      rowIndexColumn.add(ROW_INDEX_TEMPORARY_COLUMN_NAME)
+      //always remove the row index column from the skeleton because the data 
file will also have the same column
+      val skeletonProjection = projectRecord(skeletonRequiredSchema,
+        AvroSchemaUtils.removeFieldsFromSchema(skeletonRequiredSchema, 
rowIndexColumn))
 
-      override def hasNext: Boolean = {
-        //If the iterators are out of sync it is probably due to filter 
pushdown
-        checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
-          "Bootstrap data-file iterator and skeleton-file iterator have to be 
in-sync!")
-        dataFileIterator.hasNext && skeletonFileIterator.hasNext
+      //If we need to do position based merging with log files we will leave 
the row index column at the end
+      val dataProjection = if (getHasLogFiles && 
getShouldMergeUseRecordPosition) {
+        getIdentityProjection
+      } else {
+        projectRecord(dataRequiredSchema,
+          AvroSchemaUtils.removeFieldsFromSchema(dataRequiredSchema, 
rowIndexColumn))
       }
 
-      override def next(): Any = {
-        (skeletonFileIterator.next(), dataFileIterator.next()) match {
-          case (s: ColumnarBatch, d: ColumnarBatch) =>
-            val numCols = s.numCols() + d.numCols()
-            val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
-            for (i <- 0 until numCols) {
-              if (i < s.numCols()) {
-                vecs(i) = s.column(i)
+      //row index will always be the last column
+      val skeletonRowIndex = skeletonRequiredSchema.getFields.size() - 1
+      val dataRowIndex = dataRequiredSchema.getFields.size() - 1
+
+      //Always use internal row for positional merge because
+      //we need to iterate row by row when merging
+      new CachingIterator[InternalRow] {
+        val combinedRow = new JoinedRow()
+
+        private def getNextSkeleton: (InternalRow, Long) = {
+          val nextSkeletonRow = 
skeletonFileIterator.next().asInstanceOf[InternalRow]
+          (nextSkeletonRow, nextSkeletonRow.getLong(skeletonRowIndex))
+        }
+
+        private def getNextData: (InternalRow, Long) = {
+          val nextDataRow = dataFileIterator.next().asInstanceOf[InternalRow]
+          (nextDataRow,  nextDataRow.getLong(dataRowIndex))
+        }
+
+        override def close(): Unit = {
+          skeletonFileIterator.close()
+          dataFileIterator.close()
+        }
+
+        override protected def doHasNext(): Boolean = {
+          if (!dataFileIterator.hasNext || !skeletonFileIterator.hasNext) {
+            false
+          } else {
+            var nextSkeleton = getNextSkeleton
+            var nextData = getNextData
+            while (nextSkeleton._2 != nextData._2) {
+              if (nextSkeleton._2 > nextData._2) {
+                if (!dataFileIterator.hasNext) {
+                  return false
+                } else {
+                  nextData = getNextData
+                }
               } else {
-                vecs(i) = d.column(i - s.numCols())
+                if (!skeletonFileIterator.hasNext) {
+                  return false
+                } else {
+                  nextSkeleton = getNextSkeleton
+                }
               }
             }
-            assert(s.numRows() == d.numRows())
-            sparkAdapter.makeColumnarBatch(vecs, s.numRows())
-          case (_: ColumnarBatch, _: InternalRow) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
-          case (_: InternalRow, _: ColumnarBatch) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
-          case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+            nextRecord = 
combinedRow(skeletonProjection.apply(nextSkeleton._1), 
dataProjection.apply(nextData._1))
+            true
+          }
         }
       }
+    } else {
+      new ClosableIterator[Any] {
+        val combinedRow = new JoinedRow()
 
-      override def close(): Unit = {
-        skeletonFileIterator.close()
-        dataFileIterator.close()
-      }
-    }.asInstanceOf[ClosableIterator[InternalRow]]
+        override def hasNext: Boolean = {
+          //If the iterators are out of sync it is probably due to filter 
pushdown
+          checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
+            "Bootstrap data-file iterator and skeleton-file iterator have to 
be in-sync!")
+          dataFileIterator.hasNext && skeletonFileIterator.hasNext
+        }
+
+        override def next(): Any = {
+          (skeletonFileIterator.next(), dataFileIterator.next()) match {
+            case (s: ColumnarBatch, d: ColumnarBatch) =>
+              //This will not be used until [HUDI-7693] is implemented
+              val numCols = s.numCols() + d.numCols()
+              val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
+              for (i <- 0 until numCols) {
+                if (i < s.numCols()) {
+                  vecs(i) = s.column(i)
+                } else {
+                  vecs(i) = d.column(i - s.numCols())
+                }
+              }
+              assert(s.numRows() == d.numRows())
+              sparkAdapter.makeColumnarBatch(vecs, s.numRows())
+            case (_: ColumnarBatch, _: InternalRow) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
+            case (_: InternalRow, _: ColumnarBatch) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
+            case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+          }
+        }
+
+        override def close(): Unit = {
+          skeletonFileIterator.close()
+          dataFileIterator.close()
+        }
+      }.asInstanceOf[ClosableIterator[InternalRow]]
+    }
   }
 }
+
+object SparkFileFormatInternalRowReaderContext {
+  // From "namedExpressions.scala": Used to construct to record position field 
metadata.
+  private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY = 
"__file_source_generated_metadata_col"
+  private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
+  private val METADATA_COL_ATTR_KEY = "__metadata_col"
+
+  def getAppliedRequiredSchema(requiredSchema: StructType, 
shouldAddRecordPosition: Boolean): StructType = {
+    if (shouldAddRecordPosition) {
+      val metadata = new MetadataBuilder()
+        .putString(METADATA_COL_ATTR_KEY, ROW_INDEX_TEMPORARY_COLUMN_NAME)
+        .putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true)
+        .putString(FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY, 
ROW_INDEX_TEMPORARY_COLUMN_NAME)
+        .build()
+      val rowIndexField = StructField(ROW_INDEX_TEMPORARY_COLUMN_NAME, 
LongType, nullable = false, metadata)
+      StructType(requiredSchema.fields.filterNot(isIndexTempColumn) :+ 
rowIndexField)
+    } else {
+      requiredSchema
+    }
+  }
+
+  /**
+   * Only valid if there is support for RowIndexField and no log files
+   * Filters are safe for bootstrap if meta col filters are independent from 
data col filters.
+   */
+  def filterIsSafeForBootstrap(filter: Filter): Boolean = {
+    val metaRefCount = filter.references.count(c => 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(c.toLowerCase))
+    metaRefCount == filter.references.length || metaRefCount == 0
+  }
+
+  private def isIndexTempColumn(field: StructField): Boolean = {
+    field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME)
+  }
+
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 81db7ef9e70..218e0eb4b03 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -58,6 +58,7 @@ public abstract class HoodieReaderContext<T> {
   private Boolean hasLogFiles = null;
   private Boolean hasBootstrapBaseFile = null;
   private Boolean needsBootstrapMerge = null;
+  private Boolean shouldMergeUseRecordPosition = null;
 
   // Getter and Setter for schemaHandler
   public HoodieFileGroupReaderSchemaHandler<T> getSchemaHandler() {
@@ -122,6 +123,15 @@ public abstract class HoodieReaderContext<T> {
     this.needsBootstrapMerge = needsBootstrapMerge;
   }
 
+  // Getter and Setter for useRecordPosition
+  public boolean getShouldMergeUseRecordPosition() {
+    return shouldMergeUseRecordPosition;
+  }
+
+  public void setShouldMergeUseRecordPosition(boolean 
shouldMergeUseRecordPosition) {
+    this.shouldMergeUseRecordPosition = shouldMergeUseRecordPosition;
+  }
+
   // These internal key names are only used in memory for record metadata and 
merging,
   // and should not be persisted to storage.
   public static final String INTERNAL_META_RECORD_KEY = "_0";
@@ -301,9 +311,21 @@ public abstract class HoodieReaderContext<T> {
    * @return the record position in the base file.
    */
   public long extractRecordPosition(T record, Schema schema, String fieldName, 
long providedPositionIfNeeded) {
+    if (supportsParquetRowIndex()) {
+      Object position = getValue(record, schema, fieldName);
+      if (position != null) {
+        return (long) position;
+      } else {
+        throw new IllegalStateException("Record position extraction failed");
+      }
+    }
     return providedPositionIfNeeded;
   }
 
+  public boolean supportsParquetRowIndex() {
+    return false;
+  }
+
   /**
    * Constructs engine specific delete record.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index fe450bb0165..396da4166a7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -106,10 +106,12 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     readerContext.setRecordMerger(this.recordMerger);
     readerContext.setTablePath(tablePath);
     readerContext.setLatestCommitTime(latestCommitTime);
+    readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition);
     readerContext.setHasLogFiles(!this.logFiles.isEmpty());
     readerContext.setHasBootstrapBaseFile(hoodieBaseFileOption.isPresent() && 
hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent());
-    readerContext.setSchemaHandler(new 
HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema,
-        requestedSchema, internalSchemaOpt, tableConfig));
+    readerContext.setSchemaHandler(readerContext.supportsParquetRowIndex()
+        ? new HoodiePositionBasedSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig)
+        : new HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig));
     this.outputConverter = 
readerContext.getSchemaHandler().getOutputConverter();
     this.recordBuffer = this.logFiles.isEmpty()
         ? null
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 29f05b015ed..bc25bb96f5e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -53,7 +53,7 @@ import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STR
  * {@link #hasNext} method is called.
  */
 public class HoodiePositionBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupRecordBuffer<T> {
-  public static final String ROW_INDEX_COLUMN_NAME = "row_index";
+  private static final String ROW_INDEX_COLUMN_NAME = "row_index";
   public static final String ROW_INDEX_TEMPORARY_COLUMN_NAME = 
"_tmp_metadata_" + ROW_INDEX_COLUMN_NAME;
   private long nextRecordPosition = 0L;
 
@@ -180,7 +180,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
     // Handle merging.
     while (baseFileIterator.hasNext()) {
       T baseRecord = baseFileIterator.next();
-      nextRecordPosition = readerContext.extractRecordPosition(baseRecord, 
readerSchema, ROW_INDEX_COLUMN_NAME, nextRecordPosition);
+      nextRecordPosition = readerContext.extractRecordPosition(baseRecord, 
readerSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
       Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(nextRecordPosition++);
       if (hasNextBaseRecord(baseRecord, logRecordInfo)) {
         return true;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
new file mode 100644
index 00000000000..87c4266e350
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.InternalSchema;
+
+import org.apache.avro.Schema;
+
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested;
+
+/**
+ * This class is responsible for handling the schema for the file group reader 
that supports positional merge.
+ */
+public class HoodiePositionBasedSchemaHandler<T> extends 
HoodieFileGroupReaderSchemaHandler<T> {
+  public HoodiePositionBasedSchemaHandler(HoodieReaderContext<T> readerContext,
+                                          Schema dataSchema,
+                                          Schema requestedSchema,
+                                          Option<InternalSchema> 
internalSchemaOpt,
+                                          HoodieTableConfig hoodieTableConfig) 
{
+    super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, 
hoodieTableConfig);
+  }
+
+  @Override
+  protected Schema prepareRequiredSchema() {
+    Schema preMergeSchema = super.prepareRequiredSchema();
+    return readerContext.getShouldMergeUseRecordPosition() && 
readerContext.getHasLogFiles()
+        ? addPositionalMergeCol(preMergeSchema)
+        : preMergeSchema;
+  }
+
+  @Override
+  public Pair<List<Schema.Field>,List<Schema.Field>> 
getBootstrapRequiredFields() {
+    Pair<List<Schema.Field>,List<Schema.Field>> dataAndMetaCols = 
super.getBootstrapRequiredFields();
+    if (readerContext.supportsParquetRowIndex()) {
+      if (!dataAndMetaCols.getLeft().isEmpty() && 
!dataAndMetaCols.getRight().isEmpty()) {
+        dataAndMetaCols.getLeft().add(getPositionalMergeField());
+        dataAndMetaCols.getRight().add(getPositionalMergeField());
+      }
+    }
+    return dataAndMetaCols;
+  }
+
+  private static Schema addPositionalMergeCol(Schema input) {
+    return appendFieldsToSchemaDedupNested(input, 
Collections.singletonList(getPositionalMergeField()));
+  }
+
+  private static Schema.Field getPositionalMergeField() {
+    return new 
Schema.Field(HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME,
+        Schema.create(Schema.Type.LONG), "", -1L);
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 0412d202148..c9e001dde59 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -149,7 +149,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                 val hoodieBaseFile = fileSlice.getBaseFile.get()
                 
baseFileReader(createPartitionedFile(fileSliceMapping.getPartitionValues, 
hoodieBaseFile.getStoragePath, 0, hoodieBaseFile.getFileLen))
               } else {
-                val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, 
tableState.recordKeyField, filters)
+                val readerContext = new 
SparkFileFormatInternalRowReaderContext(parquetFileReader.value, 
tableState.recordKeyField, filters, requiredFilters)
                 val storageConf = broadcastedStorageConf.value
                 val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
                   
.builder().setConf(storageConf).setBasePath(tableState.tablePath).build
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
new file mode 100644
index 00000000000..abf9d238dd6
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.execution.datasources.parquet
+
+import org.apache.hudi.SparkFileFormatInternalRowReaderContext
+import 
org.apache.hudi.SparkFileFormatInternalRowReaderContext.filterIsSafeForBootstrap
+import org.apache.hudi.common.model.HoodieRecord
+import 
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.spark.sql.sources.{And, IsNotNull, Or}
+import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Test
+
+class TestSparkFileFormatInternalRowReaderContext extends 
SparkClientFunctionalTestHarness {
+
+  @Test
+  def testBootstrapFilters(): Unit = {
+    val recordKeyField = 
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName
+    val commitTimeField = 
HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName
+
+    val recordKeyFilter = IsNotNull(recordKeyField)
+    assertTrue(filterIsSafeForBootstrap(recordKeyFilter))
+    val commitTimeFilter = IsNotNull(commitTimeField)
+    assertTrue(filterIsSafeForBootstrap(commitTimeFilter))
+
+    val dataFieldFilter = IsNotNull("someotherfield")
+    assertTrue(filterIsSafeForBootstrap(dataFieldFilter))
+
+    val legalComplexFilter = Or(recordKeyFilter, commitTimeFilter)
+    assertTrue(filterIsSafeForBootstrap(legalComplexFilter))
+
+    val illegalComplexFilter = Or(recordKeyFilter, dataFieldFilter)
+    assertFalse(filterIsSafeForBootstrap(illegalComplexFilter))
+
+    val illegalNestedFilter = And(legalComplexFilter, illegalComplexFilter)
+    assertFalse(filterIsSafeForBootstrap(illegalNestedFilter))
+
+    val legalNestedFilter = And(legalComplexFilter, recordKeyFilter)
+    assertTrue(filterIsSafeForBootstrap(legalNestedFilter))
+  }
+
+  @Test
+  def testGetAppliedRequiredSchema(): Unit = {
+    val fields = Array(
+      StructField("column_a", LongType, nullable = false),
+      StructField("column_b", StringType, nullable = false))
+    val requiredSchema = StructType(fields)
+
+    val appliedSchema: StructType = 
SparkFileFormatInternalRowReaderContext.getAppliedRequiredSchema(
+      requiredSchema, true)
+    assertEquals(3, appliedSchema.fields.length)
+    assertTrue(appliedSchema.fields.map(f => 
f.name).contains(ROW_INDEX_TEMPORARY_COLUMN_NAME))
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..e59e65bea3e
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import 
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer;
+import org.apache.hudi.common.table.read.HoodiePositionBasedSchemaHandler;
+import org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
+import static org.apache.hudi.common.model.WriteOperationType.INSERT;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.createMetaClient;
+import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodiePositionBasedFileGroupRecordBuffer extends 
TestHoodieFileGroupReaderOnSpark {
+  private final HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF);
+  private HoodieTableMetaClient metaClient;
+  private Schema avroSchema;
+  private HoodiePositionBasedFileGroupRecordBuffer<InternalRow> buffer;
+  private String partitionPath;
+
+  public void prepareBuffer(boolean useCustomMerger) throws Exception {
+    Map<String, String> writeConfigs = new HashMap<>();
+    writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
"parquet");
+    writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
+    writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
"partition_path");
+    writeConfigs.put("hoodie.datasource.write.precombine.field", "timestamp");
+    writeConfigs.put("hoodie.payload.ordering.field", "timestamp");
+    writeConfigs.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
+    writeConfigs.put("hoodie.insert.shuffle.parallelism", "4");
+    writeConfigs.put("hoodie.upsert.shuffle.parallelism", "4");
+    writeConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "2");
+    writeConfigs.put("hoodie.delete.shuffle.parallelism", "1");
+    writeConfigs.put("hoodie.merge.small.file.group.candidates.limit", "0");
+    writeConfigs.put("hoodie.compact.inline", "false");
+    writeConfigs.put(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), "true");
+    commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)), 
INSERT.value(), writeConfigs);
+
+    String[] partitionPaths = dataGen.getPartitionPaths();
+    String[] partitionValues = new String[1];
+    partitionPath = partitionPaths[0];
+    partitionValues[0] = partitionPath;
+
+    metaClient = createMetaClient(getStorageConf(), getBasePath());
+    avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+    Option<String[]> partitionFields = 
metaClient.getTableConfig().getPartitionFields();
+    Option<String> partitionNameOpt = 
StringUtils.isNullOrEmpty(partitionPaths[0])
+        ? Option.empty() : Option.of(partitionPaths[0]);
+
+    HoodieReaderContext ctx = getHoodieReaderContext(getBasePath(), 
avroSchema, getStorageConf());
+    ctx.setTablePath(metaClient.getBasePathV2().toString());
+    ctx.setLatestCommitTime(metaClient.createNewInstantTime());
+    ctx.setShouldMergeUseRecordPosition(true);
+    ctx.setHasBootstrapBaseFile(false);
+    ctx.setHasLogFiles(true);
+    ctx.setNeedsBootstrapMerge(false);
+    ctx.setRecordMerger(useCustomMerger ? new CustomMerger() : new 
HoodieSparkRecordMerger());
+    ctx.setSchemaHandler(new HoodiePositionBasedSchemaHandler<>(ctx, 
avroSchema, avroSchema,
+        Option.empty(), metaClient.getTableConfig()));
+    buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
+        ctx,
+        metaClient,
+        partitionNameOpt,
+        partitionFields,
+        ctx.getRecordMerger(),
+        new TypedProperties(),
+        1024 * 1024 * 1000,
+        metaClient.getTempFolderPath(),
+        ExternalSpillableMap.DiskMapType.ROCKS_DB,
+        false);
+  }
+
+  public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader() {
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
avroSchema.toString());
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    return header;
+  }
+
+  public List<DeleteRecord> getDeleteRecords() throws IOException, 
URISyntaxException {
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    List<IndexedRecord> records = testUtil.generateHoodieTestRecords(0, 100);
+
+    List<DeleteRecord> deletedRecords = records.stream()
+        .map(s -> (DeleteRecord.create(((GenericRecord) 
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+            ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
+        .collect(Collectors.toList()).subList(0, 50);
+    return deletedRecords;
+  }
+
+  public HoodieDeleteBlock getDeleteBlockWithPositions() throws IOException, 
URISyntaxException {
+    List<DeleteRecord> deletedRecords = getDeleteRecords();
+    List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+
+    long position = 0;
+    for (DeleteRecord dr : deletedRecords) {
+      deleteRecordList.add(Pair.of(dr, position++));
+    }
+    return new HoodieDeleteBlock(deleteRecordList, true, getHeader());
+  }
+
+  public HoodieDeleteBlock getDeleteBlockWithoutPositions() throws 
IOException, URISyntaxException {
+    List<DeleteRecord> deletedRecords = getDeleteRecords();
+    List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+
+    for (DeleteRecord dr : deletedRecords) {
+      deleteRecordList.add(Pair.of(dr, -1L));
+    }
+    return new HoodieDeleteBlock(deleteRecordList, true, getHeader());
+  }
+
+  @Test
+  public void testProcessDeleteBlockWithPositions() throws Exception {
+    prepareBuffer(false);
+    HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions();
+    buffer.processDeleteBlock(deleteBlock);
+    assertEquals(50, buffer.getLogRecords().size());
+    // With record positions, we do not need the record keys.
+    
assertNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY));
+  }
+
+  @Test
+  public void testProcessDeleteBlockWithCustomMerger() throws Exception {
+    prepareBuffer(true);
+    HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions();
+    buffer.processDeleteBlock(deleteBlock);
+    assertEquals(50, buffer.getLogRecords().size());
+    
assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY));
+  }
+
+  @Test
+  public void testProcessDeleteBlockWithoutPositions() throws Exception {
+    prepareBuffer(false);
+    HoodieDeleteBlock deleteBlock = getDeleteBlockWithoutPositions();
+    Exception exception = assertThrows(
+        HoodieValidationException.class, () -> 
buffer.processDeleteBlock(deleteBlock));
+    assertTrue(exception.getMessage().contains("No record position info is 
found"));
+  }
+
+  public static class CustomMerger implements HoodieRecordMerger {
+    @Override
+    public String getMergingStrategy() {
+      return "random_strategy";
+    }
+
+    @Override
+    public Option<Pair<HoodieRecord, Schema>> merge(
+        HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema 
newSchema, TypedProperties props
+    ) throws IOException {
+      throw new IOException("Not implemented");
+    }
+
+    @Override
+    public HoodieRecord.HoodieRecordType getRecordType() {
+      return HoodieRecord.HoodieRecordType.SPARK;
+    }
+  }
+}
+
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
new file mode 100644
index 00000000000..b8ca6373237
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Map;
+
+
+/**
+ * Ensure that parquet filters are not being pushed down when they shouldn't be
+ */
+@Tag("functional")
+public class TestFiltersInFileGroupReader extends TestBootstrapReadBase {
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testFiltersInFileFormat(boolean mergeUseRecordPositions) {
+    this.bootstrapType = "mixed";
+    this.dashPartitions = true;
+    this.tableType = HoodieTableType.MERGE_ON_READ;
+    this.nPartitions = 2;
+    this.nInserts = 100000;
+    this.nUpdates = 20000;
+    sparkSession.conf().set(SQLConf.PARQUET_RECORD_FILTER_ENABLED().key(), 
"true");
+    setupDirs();
+
+    //do bootstrap
+    Map<String, String> options = setBootstrapOptions();
+    Dataset<Row> bootstrapDf = sparkSession.emptyDataFrame();
+    bootstrapDf.write().format("hudi")
+        .options(options)
+        .mode(SaveMode.Overwrite)
+        .save(bootstrapTargetPath);
+    runComparison(mergeUseRecordPositions);
+
+
+    options = basicOptions();
+    options.put(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
String.valueOf(mergeUseRecordPositions));
+    options.put(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), 
String.valueOf(mergeUseRecordPositions));
+
+    doUpdate(options, "001");
+    runComparison(mergeUseRecordPositions);
+
+    doInsert(options, "002");
+    runComparison(mergeUseRecordPositions);
+
+    doDelete(options, "003");
+    runComparison(mergeUseRecordPositions);
+  }
+
+  protected void runComparison(boolean mergeUseRecordPositions) {
+    compareDf(createDf(hudiBasePath, true, mergeUseRecordPositions), 
createDf(hudiBasePath, false, false));
+    compareDf(createDf(bootstrapTargetPath, true, mergeUseRecordPositions), 
createDf(bootstrapTargetPath, false, false));
+    compareDf(createDf2(hudiBasePath, true, mergeUseRecordPositions), 
createDf2(hudiBasePath, false, false));
+    compareDf(createDf2(bootstrapTargetPath, true, mergeUseRecordPositions), 
createDf2(bootstrapTargetPath, false, false));
+  }
+
+  protected Dataset<Row> createDf(String tableBasePath, Boolean 
fgReaderEnabled, Boolean mergeUseRecordPositions) {
+    //The chances of a uuid containing 00 with the 8-4-4-4-12 format is around 
90%
+    //for bootstrap, _hoodie_record_key is in the skeleton while begin_lat is 
in the data
+    //We have a record key filter so that tests MORs filter pushdown with 
position based merging
+    return sparkSession.read().format("hudi")
+        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
fgReaderEnabled)
+        .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
mergeUseRecordPositions)
+        .load(tableBasePath)
+        .drop("city_to_state")
+        .where("begin_lat > 0.5 and _hoodie_record_key LIKE '%00%'");
+  }
+
+  protected Dataset<Row> createDf2(String tableBasePath, Boolean 
fgReaderEnabled, Boolean mergeUseRecordPositions) {
+    //The chances of a uuid containing 00 with the 8-4-4-4-12 format is around 
90%
+    //for bootstrap, _hoodie_record_key is in the skeleton while begin_lat is 
in the data
+    //We have a record key filter so that tests MORs filter pushdown with 
position based merging
+    return sparkSession.read().format("hudi")
+        .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
fgReaderEnabled)
+        .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), 
mergeUseRecordPositions)
+        .load(tableBasePath)
+        .drop("city_to_state")
+        .where("begin_lat > 0.5 or _hoodie_record_key LIKE '%00%'");
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 49277884eb1..747fcb9a2eb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -84,7 +84,7 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     val reader = sparkAdapter.createParquetFileReader(vectorized = false, 
spark.sessionState.conf, Map.empty, 
storageConf.unwrapAs(classOf[Configuration]))
     val metaClient = 
HoodieTableMetaClient.builder().setConf(storageConf).setBasePath(tablePath).build
     val recordKeyField = new 
HoodieSparkRecordMerger().getMandatoryFieldsForMerging(metaClient.getTableConfig)(0)
-    new SparkFileFormatInternalRowReaderContext(reader, recordKeyField, 
Seq.empty)
+    new SparkFileFormatInternalRowReaderContext(reader, recordKeyField, 
Seq.empty, Seq.empty)
   }
 
   override def commitToTable(recordList: util.List[String], operation: String, 
options: util.Map[String, String]): Unit = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
new file mode 100644
index 00000000000..61b7b0ded04
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.testutils.HoodieTestTable
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.hudi.util.CloseableInternalRowIterator
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils, 
SparkFileFormatInternalRowReaderContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, 
assertFalse}
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+class TestSpark35RecordPositionMetadataColumn extends 
SparkClientFunctionalTestHarness {
+  private val PARQUET_FORMAT = "parquet"
+  private val SPARK_MERGER = "org.apache.hudi.HoodieSparkRecordMerger"
+
+  @BeforeEach
+  def setUp(): Unit = {
+    val _spark = spark
+    import _spark.implicits._
+
+    val userToCountryDF = Seq(
+      (1, "US", "1001"),
+      (2, "China", "1003"),
+      (3, "US", "1002"),
+      (4, "Singapore", "1004"))
+      .toDF("userid", "country", "ts")
+
+    // Create the file with record positions.
+    userToCountryDF.write.format("hudi")
+      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "userid")
+      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
+      .option(HoodieWriteConfig.TBL_NAME.key, "user_to_country")
+      .option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, SPARK_MERGER)
+      .option(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key, "true")
+      .option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key, 
PARQUET_FORMAT)
+      .option(
+        DataSourceWriteOptions.TABLE_TYPE.key(),
+        HoodieTableType.MERGE_ON_READ.name())
+      .save(basePath)
+  }
+
+  @Test
+  def testRecordPositionColumn(): Unit = {
+    val _spark = spark
+    // Prepare the schema
+    val dataSchema = new StructType(
+      Array(
+        StructField("userid", IntegerType, nullable = false),
+        StructField("country", StringType, nullable = false),
+        StructField("ts", StringType, nullable = false)
+      )
+    )
+
+    // Prepare the file and Parquet file reader.
+    _spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
+
+    val hadoopConf = new 
Configuration(spark().sparkContext.hadoopConfiguration)
+    val props = Map("spark.sql.parquet.enableVectorizedReader" -> "false")
+    _spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
+    val reader = sparkAdapter.createParquetFileReader(vectorized = false, 
_spark.sessionState.conf, props, hadoopConf)
+
+    val metaClient = 
getHoodieMetaClient(HadoopFSUtils.getStorageConfWithCopy(_spark.sparkContext.hadoopConfiguration),
 basePath)
+    val allBaseFiles = HoodieTestTable.of(metaClient).listAllBaseFiles
+    assertFalse(allBaseFiles.isEmpty)
+
+    val requiredSchema = 
SparkFileFormatInternalRowReaderContext.getAppliedRequiredSchema(dataSchema,
+        new SparkFileFormatInternalRowReaderContext(reader, "userid", 
Seq.empty, Seq.empty).supportsParquetRowIndex)
+
+    // Confirm if the schema is as expected.
+    if (HoodieSparkUtils.gteqSpark3_5) {
+      assertEquals(4, requiredSchema.fields.length)
+      assertEquals(
+        "StructField(_tmp_metadata_row_index,LongType,false)",
+        requiredSchema.fields(3).toString)
+    }
+
+    // Make sure we can read all the positions out from base file.
+    // Here we don't add filters since enabling filter push-down
+    // for parquet file is tricky.
+    if (HoodieSparkUtils.gteqSpark3_5) {
+      val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
+        .createPartitionedFile(
+          InternalRow.empty,
+          allBaseFiles.get(0).getPath,
+          0,
+          allBaseFiles.get(0).getLength)
+      val iterator = new CloseableInternalRowIterator(reader.read(fileInfo, 
requiredSchema,
+        StructType(Seq.empty), Seq.empty, new 
HadoopStorageConfiguration(hadoopConf)))
+      var rowIndices: Set[Long] = Set()
+      while (iterator.hasNext) {
+        val row = iterator.next()
+        rowIndices += row.getLong(3)
+      }
+      iterator.close()
+      val expectedRowIndices: Set[Long] = Set(0L, 1L, 2L, 3L)
+      assertEquals(expectedRowIndices, rowIndices)
+    }
+  }
+
+  @Test
+  def testUseFileGroupReaderDirectly(): Unit = {
+    val _spark = spark
+    import _spark.implicits._
+
+    // Read the records out with record positions.
+    val allRecords = _spark.read.format("hudi")
+      .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key, "true")
+      .load(basePath)
+
+    // Ensure the number of outcomes are correct for all Spark versions
+    // including Spark3.5.
+    val usRecords = allRecords
+      .select("userid")
+      .filter("country = 'US'").map(_.getInt(0)).collect()
+    assertArrayEquals(Array[Int](1, 3), usRecords)
+  }
+}

Reply via email to