This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7964202a751d refactor: Add Lombok Builders (#17781)
7964202a751d is described below

commit 7964202a751dc24d65fa0bcd1f300278a453e6bd
Author: voonhous <[email protected]>
AuthorDate: Tue Jun 2 19:08:47 2026 +0800

    refactor: Add Lombok Builders (#17781)
    
    - Add Lombok Builder to HoodieFileGroupReader
    - Add Lombok Builder to InputSplit
    - Add Lombok Builder to ReaderParameters
---
 .../hudi/cli/commands/HoodieLogFileCommand.java    |   6 +-
 .../org/apache/hudi/index/HoodieIndexUtils.java    |   8 +-
 .../hudi/io/FileGroupReaderBasedAppendHandle.java  |  20 +-
 .../hudi/io/FileGroupReaderBasedMergeHandle.java   |   6 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |   8 +-
 .../SecondaryIndexRecordGenerationUtils.java       |  10 +-
 .../strategy/ClusteringExecutionStrategy.java      |  17 +-
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |   2 +-
 .../client/utils/SparkMetadataWriterUtils.java     |   2 +-
 .../client/TestSparkRDDMetadataWriteClient.java    |   6 +-
 .../functional/TestHoodieBackedTableMetadata.java  |   2 +-
 .../common/table/read/HoodieFileGroupReader.java   | 296 +++++++--------------
 .../apache/hudi/common/table/read/InputSplit.java  |  64 +++--
 .../hudi/common/table/read/ReaderParameters.java   |  82 ++----
 .../buffer/DefaultFileGroupRecordBufferLoader.java |   6 +-
 .../read/buffer/LogScanningRecordBufferLoader.java |   2 +-
 .../ReusableFileGroupRecordBufferLoader.java       |   2 +-
 .../StreamingFileGroupRecordBufferLoader.java      |   4 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |   6 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   8 +-
 .../table/read/TestHoodieFileGroupReaderBase.java  |   8 +-
 .../read/buffer/BaseTestFileGroupRecordBuffer.java |   2 +-
 .../buffer/TestFileGroupRecordBufferLoader.java    |   4 +-
 .../TestSortedKeyBasedFileGroupRecordBuffer.java   |   2 +-
 .../org/apache/hudi/table/format/FormatUtils.java  |   8 +-
 .../reader/HoodieFileGroupReaderTestHarness.java   |   7 +-
 .../HoodieFileGroupReaderBasedRecordReader.java    |   6 +-
 .../org/apache/hudi/HoodieMergeOnReadRDDV2.scala   |   8 +-
 .../org/apache/hudi/cdc/CDCFileGroupIterator.scala |   8 +-
 .../HoodieFileGroupReaderBasedFileFormat.scala     |   8 +-
 .../procedures/PartitionBucketIndexManager.scala   |   8 +-
 .../TestMetadataUtilRLIandSIRecordGeneration.java  |   2 +-
 .../hudi/functional/TestHoodieBackedMetadata.java  |   2 +-
 33 files changed, 279 insertions(+), 351 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 6138e1fb3839..d6d2546e2cb1 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -242,10 +242,12 @@ public class HoodieLogFileCommand {
           Option.empty(),
           Option.empty(),
           fileGroupReaderProperties);
-      try (HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+      try (HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>builder()
           .withReaderContext(readerContext)
           .withHoodieTableMetaClient(HoodieCLI.getTableMetaClient())
-          .withFileSlice(fileSlice)
+          .withBaseFileOption(fileSlice.getBaseFile())
+          .withLogFiles(fileSlice.getLogFiles())
+          .withPartitionPath(fileSlice.getPartitionPath())
           .withDataSchema(readerSchema)
           .withRequestedSchema(readerSchema)
           
.withLatestCommitTime(client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(HoodieInstantTimeGenerator.getCurrentInstantTimeStr()))
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index d9fe1068e421..d2cc530295f2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -319,14 +319,16 @@ public class HoodieIndexUtils {
       Option<InternalSchema> internalSchemaOption = 
SerDeHelper.fromJson(config.getInternalSchema());
       FileSlice fileSlice = fileSliceOption.get();
       HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
-      HoodieFileGroupReader<R> fileGroupReader = 
HoodieFileGroupReader.<R>newBuilder()
+      HoodieFileGroupReader<R> fileGroupReader = 
HoodieFileGroupReader.<R>builder()
           .withReaderContext(readerContext)
           .withHoodieTableMetaClient(metaClient)
           .withLatestCommitTime(instantTime.get())
-          .withFileSlice(fileSlice)
+          .withBaseFileOption(fileSlice.getBaseFile())
+          .withLogFiles(fileSlice.getLogFiles())
+          .withPartitionPath(fileSlice.getPartitionPath())
           .withDataSchema(dataSchema)
           .withRequestedSchema(dataSchema)
-          .withInternalSchema(internalSchemaOption)
+          .withInternalSchemaOpt(internalSchemaOption)
           .withProps(metaClient.getTableConfig().getProps())
           .build();
       try {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
index a081709f6fc2..c40ce0158a3c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
@@ -82,10 +82,20 @@ public class FileGroupReaderBasedAppendHandle<T, I, K, O> 
extends HoodieAppendHa
         new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
             config.getBasePath(), operation.getPartitionPath()), 
logFileName)));
     // Initializes the record iterator, log compaction requires writing the 
deletes into the delete block of the resulting log file.
-    try (HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
-        
.withLatestCommitTime(instantTime).withPartitionPath(partitionPath).withLogFiles(logFiles).withBaseFileOption(Option.empty()).withDataSchema(writeSchemaWithMetaFields)
-        
.withRequestedSchema(writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props).withEmitDelete(true)
-        
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
+    try (HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>builder()
+        .withReaderContext(readerContext)
+        .withHoodieTableMetaClient(hoodieTable.getMetaClient())
+        .withLatestCommitTime(instantTime)
+        .withPartitionPath(partitionPath)
+        .withLogFiles(logFiles)
+        .withBaseFileOption(Option.empty())
+        .withDataSchema(writeSchemaWithMetaFields)
+        .withRequestedSchema(writeSchemaWithMetaFields)
+        .withInternalSchemaOpt(internalSchemaOption)
+        .withProps(props)
+        .withEmitDelete(true)
+        .withShouldUseRecordPosition(usePosition)
+        .withSortOutput(hoodieTable.requireSortedRecords())
         // instead of using config.enableOptimizedLogBlocksScan(), we set to 
true as log compaction blocks only supported in scanV2
         .build()) {
       recordItr = new 
CloseableMappingIterator<>(fileGroupReader.getLogRecordsOnly(), record -> {
@@ -96,7 +106,7 @@ public class FileGroupReaderBasedAppendHandle<T, I, K, O> 
extends HoodieAppendHa
       header.put(HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES,
           StringUtils.join(fileGroupReader.getValidBlockInstants(), ","));
       super.doAppend();
-      this.readStats = fileGroupReader.getStats();
+      this.readStats = fileGroupReader.getReadStats();
     } catch (IOException e) {
       throw new HoodieIOException("Failed to initialize file group reader for 
" + fileId, e);
     }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
index d708c15f3384..2893913a0d13 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
@@ -301,7 +301,7 @@ public class FileGroupReaderBasedMergeHandle<T, I, K, O> 
extends HoodieWriteMerg
 
         // The stats of inserts, updates, and deletes are updated once at the 
end
         // These will be set in the write stat when closing the merge handle
-        this.readStats = fileGroupReader.getStats();
+        this.readStats = fileGroupReader.getReadStats();
         this.insertRecordsWritten = readStats.getNumInserts();
         this.updatedRecordsWritten = readStats.getNumUpdates();
         this.recordsDeleted = readStats.getNumDeletes();
@@ -318,10 +318,10 @@ public class FileGroupReaderBasedMergeHandle<T, I, K, O> 
extends HoodieWriteMerg
 
   private HoodieFileGroupReader<T> getFileGroupReader(boolean usePosition, 
Option<InternalSchema> internalSchemaOption, TypedProperties props,
                                                         
Option<Stream<HoodieLogFile>> logFileStreamOpt, Iterator<HoodieRecord<T>> 
incomingRecordsItr) {
-    HoodieFileGroupReader.Builder<T> fileGroupBuilder = 
HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
+    HoodieFileGroupReader.HoodieFileGroupReaderBuilder<T> fileGroupBuilder = 
HoodieFileGroupReader.<T>builder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
         
.withLatestCommitTime(maxInstantTime).withPartitionPath(partitionPath).withBaseFileOption(Option.ofNullable(baseFileToMerge))
         
.withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields)
-        .withInternalSchema(internalSchemaOption).withProps(props)
+        .withInternalSchemaOpt(internalSchemaOption).withProps(props)
         
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
         .withFileGroupUpdateCallback(createCallback());
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 5cdd20f7eb5c..ce5a4eeb0a2f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -997,14 +997,16 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
       HoodieSchema requestedSchema = 
metaClient.getTableConfig().populateMetaFields() ? getRecordKeySchema()
           : HoodieSchemaUtils.projectSchema(dataSchema, 
Arrays.asList(metaClient.getTableConfig().getRecordKeyFields().orElse(new 
String[0])));
       Option<InternalSchema> internalSchemaOption = 
SerDeHelper.fromJson(dataWriteConfig.getInternalSchema());
-      HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder()
+      HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>builder()
           .withReaderContext(readerContext)
           .withHoodieTableMetaClient(metaClient)
-          .withFileSlice(fileSlice)
+          .withBaseFileOption(fileSlice.getBaseFile())
+          .withLogFiles(fileSlice.getLogFiles())
+          .withPartitionPath(fileSlice.getPartitionPath())
           .withLatestCommitTime(instantTime.get())
           .withDataSchema(dataSchema)
           .withRequestedSchema(requestedSchema)
-          .withInternalSchema(internalSchemaOption)
+          .withInternalSchemaOpt(internalSchemaOption)
           .withShouldUseRecordPosition(false)
           .withProps(metaClient.getTableConfig().getProps())
           .build();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
index 9cb9b8ca7df5..b86688b81e60 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
@@ -123,7 +123,7 @@ public class SecondaryIndexRecordGenerationUtils {
       // validate that for a given fileId, either we have 1 parquet file or N 
log files.
       AtomicInteger totalParquetFiles = new AtomicInteger();
       AtomicInteger totalLogFiles = new AtomicInteger();
-      writeStats.stream().forEach(writeStat -> {
+      writeStats.forEach(writeStat -> {
         if (FSUtils.isLogFile(new StoragePath(basePath, writeStat.getPath()))) 
{
           totalLogFiles.getAndIncrement();
         } else {
@@ -156,7 +156,7 @@ public class SecondaryIndexRecordGenerationUtils {
         } else { // log files are added in current commit
           // add new log files to existing latest file slice and compute the 
secondary index to primary key mapping.
           FileSlice latestFileSlice = fileSliceOption.get();
-          writeStats.stream().forEach(writeStat -> {
+          writeStats.forEach(writeStat -> {
             StoragePathInfo logFile = new StoragePathInfo(new 
StoragePath(basePath, writeStat.getPath()), writeStat.getFileSizeInBytes(), 
false, (short) 0, 0, 0);
             latestFileSlice.addLogFile(new HoodieLogFile(logFile));
           });
@@ -286,9 +286,11 @@ public class SecondaryIndexRecordGenerationUtils {
                                                                                
                 boolean allowInflightInstants) throws IOException {
     String secondaryKeyField = indexDefinition.getSourceFieldsKey();
     HoodieSchema requestedSchema = 
getRequestedSchemaForSecondaryIndex(metaClient, tableSchema, secondaryKeyField);
-    HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder()
+    HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>builder()
         .withReaderContext(readerContext)
-        .withFileSlice(fileSlice)
+        .withBaseFileOption(fileSlice.getBaseFile())
+        .withLogFiles(fileSlice.getLogFiles())
+        .withPartitionPath(fileSlice.getPartitionPath())
         .withHoodieTableMetaClient(metaClient)
         .withProps(props)
         .withLatestCommitTime(instantTime)
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
index d34a85a97bd4..cc3c4a02eb22 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
@@ -142,9 +142,18 @@ public abstract class ClusteringExecutionStrategy<T, I, K, 
O> implements Seriali
                                                                    
ReaderContextFactory<R> readerContextFactory, String instantTime,
                                                                    
TypedProperties properties, boolean usePosition) {
     HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
-    return HoodieFileGroupReader.<R>newBuilder()
-        
.withReaderContext(readerContext).withHoodieTableMetaClient(metaClient).withLatestCommitTime(instantTime)
-        
.withFileSlice(fileSlice).withDataSchema(readerSchema).withRequestedSchema(readerSchema).withInternalSchema(internalSchemaOption)
-        
.withShouldUseRecordPosition(usePosition).withProps(properties).build();
+    return HoodieFileGroupReader.<R>builder()
+        .withReaderContext(readerContext)
+        .withHoodieTableMetaClient(metaClient)
+        .withLatestCommitTime(instantTime)
+        .withBaseFileOption(fileSlice.getBaseFile())
+        .withLogFiles(fileSlice.getLogFiles())
+        .withPartitionPath(fileSlice.getPartitionPath())
+        .withDataSchema(readerSchema)
+        .withRequestedSchema(readerSchema)
+        .withInternalSchemaOpt(internalSchemaOption)
+        .withShouldUseRecordPosition(usePosition)
+        .withProps(properties)
+        .build();
   }
 }
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 9563439c39eb..feda056472dc 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -946,7 +946,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(HoodieSchema.fromAvroSchema(HoodieMetadataRecord.getClassSchema()));
     HoodieAvroReaderContext readerContext = new 
HoodieAvroReaderContext(metadataMetaClient.getStorageConf(), 
metadataMetaClient.getTableConfig(), Option.empty(), Option.empty(),
         new TypedProperties());
-    HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+    HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>builder()
         .withReaderContext(readerContext)
         .withHoodieTableMetaClient(metadataMetaClient)
         .withLogFiles(logFiles.stream())
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 47155899e204..2184c24ad136 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -335,7 +335,7 @@ public class SparkMetadataWriterUtils {
       baseFileOption = Option.empty();
       logFileStream = Stream.of(new HoodieLogFile(filePath));
     }
-    HoodieFileGroupReader<InternalRow> fileGroupReader = 
HoodieFileGroupReader.<InternalRow>newBuilder()
+    HoodieFileGroupReader<InternalRow> fileGroupReader = 
HoodieFileGroupReader.<InternalRow>builder()
         .withReaderContext(readerContext)
         .withHoodieTableMetaClient(metaClient)
         .withDataSchema(tableSchema)
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
index 8a88dd813588..65e45ce96f36 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
@@ -218,10 +218,12 @@ public class TestSparkRDDMetadataWriteClient extends 
HoodieClientTestBase {
     HoodieSchema schema = HoodieSchemaUtils.addMetadataFields(metadataSchema);
 
     HoodieAvroReaderContext readerContext = new 
HoodieAvroReaderContext(metadataMetaClient.getStorageConf(), 
metadataMetaClient.getTableConfig(), Option.of(instantRange), 
Option.of(predicate));
-    HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+    HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>builder()
         .withReaderContext(readerContext)
         .withHoodieTableMetaClient(metadataMetaClient)
-        .withFileSlice(fileSlice)
+        .withBaseFileOption(fileSlice.getBaseFile())
+        .withLogFiles(fileSlice.getLogFiles())
+        .withPartitionPath(fileSlice.getPartitionPath())
         .withLatestCommitTime(validMetadataInstant)
         .withRequestedSchema(metadataSchema)
         .withDataSchema(schema)
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 6bb71f07de61..fcbd689f7447 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -560,7 +560,7 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
   private void verifyMetadataMergedRecords(HoodieTableMetaClient 
metadataMetaClient, List<HoodieLogFile> logFiles, String latestCommitTimestamp, 
HoodieWriteConfig metadataTableWriteConfig) {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(HoodieSchema.fromAvroSchema(HoodieMetadataRecord.getClassSchema()));
     HoodieAvroReaderContext readerContext = new 
HoodieAvroReaderContext(metadataMetaClient.getStorageConf(), 
metadataMetaClient.getTableConfig(), Option.empty(), Option.empty());
-    HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+    HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>builder()
         .withReaderContext(readerContext)
         .withHoodieTableMetaClient(metadataMetaClient)
         .withLogFiles(logFiles.stream())
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 2b35eaff4f30..0b536b511db1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.BaseFile;
-import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -35,7 +34,6 @@ import org.apache.hudi.common.table.PartitionPathParser;
 import org.apache.hudi.common.table.read.buffer.FileGroupRecordBufferLoader;
 import org.apache.hudi.common.table.read.buffer.HoodieFileGroupRecordBuffer;
 import org.apache.hudi.common.util.ConfigUtils;
-import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -49,6 +47,10 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -67,7 +69,9 @@ import java.util.stream.Stream;
  * @param <T> The type of engine-specific record representation, e.g.,{@code 
InternalRow}
  *            in Spark and {@code RowData} in Flink.
  */
+@AllArgsConstructor
 public final class HoodieFileGroupReader<T> implements Closeable {
+
   private final HoodieReaderContext<T> readerContext;
   private final HoodieTableMetaClient metaClient;
   private final InputSplit inputSplit;
@@ -81,25 +85,113 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private HoodieFileGroupRecordBuffer<T> recordBuffer;
   private ClosableIterator<T> baseFileIterator;
   private final Option<UnaryOperator<T>> outputConverter;
+  @Getter
   private final HoodieReadStats readStats;
   // Callback to run custom logic on updates to the base files for the file 
group
   private final Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback;
   // The list of instant times read from the log blocks, this value is used by 
the log-compaction to allow optimized log-block scans
+  @Getter
   private List<String> validBlockInstants = Collections.emptyList();
   private BufferedRecordConverter<T> bufferedRecordConverter;
 
-  private HoodieFileGroupReader(HoodieReaderContext<T> readerContext, 
HoodieStorage storage, String tablePath,
-                                String latestCommitTime, HoodieSchema 
dataSchema, HoodieSchema requestedSchema,
-                                Option<InternalSchema> internalSchemaOpt, 
HoodieTableMetaClient hoodieTableMetaClient, TypedProperties props,
-                                ReaderParameters readerParameters, InputSplit 
inputSplit, Option<BaseFileUpdateCallback<T>> updateCallback,
-                                FileGroupRecordBufferLoader<T> 
recordBufferLoader) {
+  @Builder(setterPrefix = "with")
+  private HoodieFileGroupReader(
+      HoodieReaderContext<T> readerContext,
+      String latestCommitTime,
+      HoodieSchema dataSchema,
+      HoodieSchema requestedSchema,
+      Option<InternalSchema> internalSchemaOpt,
+      HoodieTableMetaClient hoodieTableMetaClient,
+      TypedProperties props,
+      Option<HoodieBaseFile> baseFileOption,
+      Stream<HoodieLogFile> logFiles,
+      String partitionPath,
+      Long start,
+      Long length,
+      Iterator<? extends HoodieRecord> recordIterator,
+      Boolean shouldUseRecordPosition,
+      Boolean allowInflightInstants,
+      Boolean emitDelete,
+      Boolean sortOutput,
+      Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback,
+      FileGroupRecordBufferLoader<T> recordBufferLoader) {
+
+    // Validations
+    ValidationUtils.checkArgument(readerContext != null, "Reader context is 
required");
+    ValidationUtils.checkArgument(hoodieTableMetaClient != null, "Hoodie table 
meta client is required");
+    ValidationUtils.checkArgument(latestCommitTime != null, "Latest commit 
time is required");
+    ValidationUtils.checkArgument(dataSchema != null, "Data schema is 
required");
+    ValidationUtils.checkArgument(requestedSchema != null, "Requested schema 
is required");
+    ValidationUtils.checkArgument(props != null, "Props is required");
+    ValidationUtils.checkArgument(partitionPath != null, "Partition path is 
required");
+
+    // Handle defaults
+    if (internalSchemaOpt == null) {
+      internalSchemaOpt = Option.empty();
+    }
+    if (baseFileOption == null) {
+      baseFileOption = Option.empty();
+    }
+    if (start == null) {
+      start = 0L;
+    }
+    if (length == null) {
+      length = Long.MAX_VALUE;
+    }
+    if (shouldUseRecordPosition == null) {
+      shouldUseRecordPosition = false;
+    }
+    if (allowInflightInstants == null) {
+      allowInflightInstants = false;
+    }
+    if (emitDelete == null) {
+      emitDelete = false;
+    }
+    if (sortOutput == null) {
+      sortOutput = false;
+    }
+    if (fileGroupUpdateCallback == null) {
+      fileGroupUpdateCallback = Option.empty();
+    }
+
+    // Derive tablePath
+    String tablePath = hoodieTableMetaClient.getBasePath().toString();
+
+    // Set the storage with the readerContext's storage configuration
+    HoodieStorage storage = hoodieTableMetaClient.getStorage().newInstance(new 
StoragePath(tablePath), readerContext.getStorageConfiguration());
+
+    // Handle recordBufferLoader default
+    if (recordBufferLoader == null) {
+      if (recordIterator != null) {
+        recordBufferLoader = 
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
+      } else {
+        recordBufferLoader = FileGroupRecordBufferLoader.createDefault();
+      }
+    }
+
+    // Build composite objects using static helpers
+    this.readerParameters = ReaderParameters.builder()
+        .shouldUseRecordPosition(shouldUseRecordPosition)
+        .emitDeletes(emitDelete)
+        .sortOutputs(sortOutput)
+        .inflightInstantsAllowed(allowInflightInstants)
+        .build();
+    this.inputSplit = InputSplit.builder()
+        .baseFileOption(baseFileOption)
+        .logFileStream(logFiles)
+        .recordIterator((Iterator<HoodieRecord>) recordIterator)
+        .partitionPath(partitionPath)
+        .start(start)
+        .length(length)
+        .build();
+
+    // Initialize fields
     this.readerContext = readerContext;
     this.recordBufferLoader = recordBufferLoader;
-    this.fileGroupUpdateCallback = updateCallback;
+    this.fileGroupUpdateCallback = fileGroupUpdateCallback;
     this.metaClient = hoodieTableMetaClient;
     this.storage = storage;
-    this.readerParameters = readerParameters;
-    this.inputSplit = inputSplit;
+
     readerContext.setHasLogFiles(this.inputSplit.hasLogFiles());
     
readerContext.getRecordContext().setPartitionPath(inputSplit.getPartitionPath());
     if (readerContext.getHasLogFiles() && inputSplit.getStart() != 0) {
@@ -112,7 +204,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     readerContext.setTablePath(tablePath);
     readerContext.setLatestCommitTime(latestCommitTime);
     boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props, 
HoodieReaderConfig.MERGE_TYPE, 
true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
-    
readerContext.setShouldMergeUseRecordPosition(readerParameters.useRecordPosition()
 && !isSkipMerge && readerContext.getHasLogFiles() && 
inputSplit.isParquetBaseFile());
+    
readerContext.setShouldMergeUseRecordPosition(readerParameters.shouldUseRecordPosition()
 && !isSkipMerge && readerContext.getHasLogFiles() && 
inputSplit.isParquetBaseFile());
     
readerContext.setHasBootstrapBaseFile(inputSplit.getBaseFileOption().flatMap(HoodieBaseFile::getBootstrapBaseFile).isPresent());
     
readerContext.setSchemaHandler(readerContext.getRecordContext().supportsParquetRowIndex()
         ? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, props, metaClient)
@@ -248,13 +340,6 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     }
   }
 
-  /**
-   * @return statistics of reading a file group.
-   */
-  public HoodieReadStats getStats() {
-    return readStats;
-  }
-
   /**
    * @return The next record after calling {@link #hasNext}.
    */
@@ -266,10 +351,6 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     return nextVal;
   }
 
-  public List<String> getValidBlockInstants() {
-    return validBlockInstants;
-  }
-
   /**
    * Notifies a write failure with the given record key.
    */
@@ -355,175 +436,4 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
       }
     }
   }
-
-  public static <T> Builder<T> newBuilder() {
-    return new Builder<>();
-  }
-
-  public static class Builder<T> {
-    private HoodieReaderContext<T> readerContext;
-    private HoodieStorage storage;
-    private String tablePath;
-    private String latestCommitTime;
-    private HoodieSchema dataSchema;
-    private HoodieSchema requestedSchema;
-    private Option<InternalSchema> internalSchemaOpt = Option.empty();
-    private HoodieTableMetaClient hoodieTableMetaClient;
-    private TypedProperties props;
-    private Option<HoodieBaseFile> baseFileOption;
-    private Stream<HoodieLogFile> logFiles;
-    private String partitionPath;
-    private long start = 0;
-    private long length = Long.MAX_VALUE;
-    private Iterator<HoodieRecord> recordIterator;
-    private boolean shouldUseRecordPosition = false;
-    private boolean allowInflightInstants = false;
-    private boolean emitDelete;
-    private boolean sortOutput = false;
-    private Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback = 
Option.empty();
-    private FileGroupRecordBufferLoader<T> recordBufferLoader;
-
-    public Builder<T> withReaderContext(HoodieReaderContext<T> readerContext) {
-      this.readerContext = readerContext;
-      return this;
-    }
-
-    public Builder<T> withLatestCommitTime(String latestCommitTime) {
-      this.latestCommitTime = latestCommitTime;
-      return this;
-    }
-
-    public Builder<T> withFileSlice(FileSlice fileSlice) {
-      this.baseFileOption = fileSlice.getBaseFile();
-      this.logFiles = fileSlice.getLogFiles();
-      this.partitionPath = fileSlice.getPartitionPath();
-      return this;
-    }
-
-    public Builder<T> withBaseFileOption(Option<HoodieBaseFile> 
baseFileOption) {
-      this.baseFileOption = baseFileOption;
-      return this;
-    }
-
-    public Builder<T> withLogFiles(Stream<HoodieLogFile> logFiles) {
-      this.logFiles = logFiles;
-      return this;
-    }
-
-    public Builder<T> withRecordIterator(Iterator<? extends HoodieRecord> 
recordIterator) {
-      this.recordIterator = (Iterator<HoodieRecord>) recordIterator;
-      this.recordBufferLoader = 
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
-      return this;
-    }
-
-    public Builder<T> withPartitionPath(String partitionPath) {
-      this.partitionPath = partitionPath;
-      return this;
-    }
-
-    public Builder<T> withDataSchema(HoodieSchema dataSchema) {
-      this.dataSchema = dataSchema;
-      return this;
-    }
-
-    public Builder<T> withRequestedSchema(HoodieSchema requestedSchema) {
-      this.requestedSchema = requestedSchema;
-      return this;
-    }
-
-    public Builder<T> withInternalSchema(Option<InternalSchema> 
internalSchemaOpt) {
-      this.internalSchemaOpt = internalSchemaOpt;
-      return this;
-    }
-
-    public Builder<T> withHoodieTableMetaClient(HoodieTableMetaClient 
hoodieTableMetaClient) {
-      this.hoodieTableMetaClient = hoodieTableMetaClient;
-      this.tablePath = hoodieTableMetaClient.getBasePath().toString();
-      return this;
-    }
-
-    public Builder<T> withProps(TypedProperties props) {
-      this.props = props;
-      return this;
-    }
-
-    public Builder<T> withStart(long start) {
-      this.start = start;
-      return this;
-    }
-
-    public Builder<T> withLength(long length) {
-      this.length = length;
-      return this;
-    }
-
-    public Builder<T> withShouldUseRecordPosition(boolean 
shouldUseRecordPosition) {
-      this.shouldUseRecordPosition = shouldUseRecordPosition;
-      return this;
-    }
-
-    public Builder<T> withAllowInflightInstants(boolean allowInflightInstants) 
{
-      this.allowInflightInstants = allowInflightInstants;
-      return this;
-    }
-
-    public Builder<T> withEmitDelete(boolean emitDelete) {
-      this.emitDelete = emitDelete;
-      return this;
-    }
-
-    public Builder<T> 
withFileGroupUpdateCallback(Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback) {
-      this.fileGroupUpdateCallback = fileGroupUpdateCallback;
-      return this;
-    }
-
-    /**
-     * If true, the output of the merge will be sorted instead of appending 
log records to end of the iterator if they do not have matching keys in the 
base file.
-     * This assumes that the base file is already sorted by key.
-     * @param sortOutput whether to sort the output iterator
-     * @return this builder instance
-     */
-    public Builder<T> withSortOutput(boolean sortOutput) {
-      this.sortOutput = sortOutput;
-      return this;
-    }
-
-    public Builder<T> withRecordBufferLoader(FileGroupRecordBufferLoader<T> 
recordBufferLoader) {
-      this.recordBufferLoader = recordBufferLoader;
-      return this;
-    }
-
-    public HoodieFileGroupReader<T> build() {
-      ValidationUtils.checkArgument(readerContext != null, "Reader context is 
required");
-      ValidationUtils.checkArgument(hoodieTableMetaClient != null, "Hoodie 
table meta client is required");
-      ValidationUtils.checkArgument(tablePath != null, "Table path is 
required");
-      // set the storage with the readerContext's storage configuration
-      this.storage = hoodieTableMetaClient.getStorage().newInstance(new 
StoragePath(tablePath), readerContext.getStorageConfiguration());
-
-      ValidationUtils.checkArgument(storage != null, "Storage is required");
-      ValidationUtils.checkArgument(latestCommitTime != null, "Latest commit 
time is required");
-      ValidationUtils.checkArgument(dataSchema != null, "Data schema is 
required");
-      ValidationUtils.checkArgument(requestedSchema != null, "Requested schema 
is required");
-      ValidationUtils.checkArgument(props != null, "Props is required");
-      ValidationUtils.checkArgument(baseFileOption != null, "Base file option 
is required");
-      ValidationUtils.checkArgument(partitionPath != null, "Partition path is 
required");
-
-      if (recordBufferLoader == null) {
-        recordBufferLoader = FileGroupRecordBufferLoader.createDefault();
-      }
-
-      ReaderParameters readerParameters = ReaderParameters.builder()
-          .shouldUseRecordPosition(shouldUseRecordPosition)
-          .emitDeletes(emitDelete)
-          .sortOutputs(sortOutput)
-          .allowInflightInstants(allowInflightInstants)
-          .build();
-      InputSplit inputSplit = new InputSplit(baseFileOption, recordIterator != 
null ? Either.right(recordIterator) : Either.left(logFiles == null ? 
Stream.empty() : logFiles),
-          partitionPath, start, length);
-      return new HoodieFileGroupReader<>(
-          readerContext, storage, tablePath, latestCommitTime, dataSchema, 
requestedSchema, internalSchemaOpt, hoodieTableMetaClient,
-          props, readerParameters, inputSplit, fileGroupUpdateCallback, 
recordBufferLoader);
-    }
-  }
-
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
index 6d7b578e15b3..a0f1a3d04e90 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
@@ -24,10 +24,13 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
-import org.apache.hudi.common.util.Either;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -38,9 +41,13 @@ import java.util.stream.Stream;
  * Represents a split of input data for reading, which includes the partition 
path the data belongs to along with an optional base file and a list of log 
files.
  * If there is only a base file, it is possible for the reader to specify a 
particular range of the file with the start and length parameters.
  */
+@Getter
 public class InputSplit {
+
   private final Option<HoodieBaseFile> baseFileOption;
+  @Getter(AccessLevel.NONE)
   private final List<HoodieLogFile> logFiles;
+  @Getter(AccessLevel.NONE)
   private final Option<Iterator<HoodieRecord>> recordIterator;
   private final String partitionPath;
   // Byte offset to start reading from the base file
@@ -48,26 +55,41 @@ public class InputSplit {
   // Length of bytes to read from the base file
   private final long length;
 
-  InputSplit(Option<HoodieBaseFile> baseFileOption,
-             Either<Stream<HoodieLogFile>, Iterator<HoodieRecord>> 
recordsToMerge,
-             String partitionPath, long start, long length) {
-    this.baseFileOption = baseFileOption;
-    if (recordsToMerge.isLeft()) {
-      this.logFiles = 
recordsToMerge.asLeft().sorted(HoodieLogFile.getLogFileComparator())
+  @Builder
+  private InputSplit(
+      Option<HoodieBaseFile> baseFileOption,
+      Stream<HoodieLogFile> logFileStream,
+      Iterator<HoodieRecord> recordIterator,
+      String partitionPath,
+      long start,
+      long length) {
+
+    // Ensure we do not have both sources of data to merge
+    // i.e. logFileStream and recordIterator cannot be both non-null
+    ValidationUtils.checkArgument(!(logFileStream != null && recordIterator != 
null),
+        "Cannot provide both logFileStream and recordIterator");
+
+    this.baseFileOption = 
Option.ofNullable(baseFileOption).orElse(Option.empty());
+    this.partitionPath = partitionPath;
+    this.start = start;
+    this.length = length;
+
+    if (logFileStream != null) {
+      // Process Log Files (if provided)
+      this.logFiles = logFileStream
+          .sorted(HoodieLogFile.getLogFileComparator())
           .filter(logFile -> 
!logFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
           .collect(Collectors.toList());
       this.recordIterator = Option.empty();
+    } else if (recordIterator != null) {
+      // Process Record Iterator (if provided)
+      this.logFiles = Collections.emptyList();
+      this.recordIterator = Option.of(recordIterator);
     } else {
+      // Handle Case with neither (Base file only read)
       this.logFiles = Collections.emptyList();
-      this.recordIterator = Option.of(recordsToMerge.asRight());
+      this.recordIterator = Option.empty();
     }
-    this.partitionPath = partitionPath;
-    this.start = start;
-    this.length = length;
-  }
-
-  public Option<HoodieBaseFile> getBaseFileOption() {
-    return baseFileOption;
   }
 
   public List<HoodieLogFile> getLogFiles() {
@@ -79,18 +101,6 @@ public class InputSplit {
     return !logFiles.isEmpty();
   }
 
-  public String getPartitionPath() {
-    return partitionPath;
-  }
-
-  public long getStart() {
-    return start;
-  }
-
-  public long getLength() {
-    return length;
-  }
-
   public boolean isParquetBaseFile() {
     return baseFileOption.map(baseFile -> 
HoodieFileFormat.fromFileExtension(baseFile.getStoragePath().getFileExtension())
 == HoodieFileFormat.PARQUET).orElse(false);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
index bfb791fe5fa7..d19ad39418b3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
@@ -19,78 +19,38 @@
 
 package org.apache.hudi.common.table.read;
 
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+
 /**
  * Parameters for how the reader should process the FileGroup while reading.
  */
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+@Builder
 public class ReaderParameters {
+
   // Rely on the position of the record in the file instead of the record keys 
while merging data between base and log files
-  private final boolean useRecordPosition;
+  @Getter(AccessLevel.NONE)
+  @Builder.Default
+  private final boolean shouldUseRecordPosition = false;
   // Whether to emit delete records while reading
-  private final boolean emitDelete;
+  @Builder.Default
+  private final boolean emitDeletes = false;
   // Whether to sort the output records while reading, this implicitly 
requires the base file to be sorted
-  private final boolean sortOutput;
+  @Builder.Default
+  private final boolean sortOutputs = false;
   // Allows to consider inflight instants while merging log records using 
HoodieMergedLogRecordReader
   // The inflight instants need to be considered while updating RLI records. 
RLI needs to fetch the revived
   // and deleted keys from the log files written as part of active data 
commit. During the RLI update,
-  // the allowInflightInstants flag would need to be set to true. This would 
ensure the HoodieMergedLogRecordReader
+  // the inflightInstantsAllowed flag would need to be set to true. This would 
ensure the HoodieMergedLogRecordReader
   // considers the log records which are inflight.
-  private final boolean allowInflightInstants;
-
-  private ReaderParameters(boolean useRecordPosition, boolean emitDelete, 
boolean sortOutput, boolean allowInflightInstants) {
-    this.useRecordPosition = useRecordPosition;
-    this.emitDelete = emitDelete;
-    this.sortOutput = sortOutput;
-    this.allowInflightInstants = allowInflightInstants;
-  }
-
-  public boolean useRecordPosition() {
-    return useRecordPosition;
-  }
-
-  public boolean emitDeletes() {
-    return emitDelete;
-  }
-
-  public boolean sortOutputs() {
-    return sortOutput;
-  }
-
-  public boolean allowInflightInstants() {
-    return allowInflightInstants;
-  }
-
-  static Builder builder() {
-    return new Builder();
-  }
-
-  static class Builder {
-    private boolean shouldUseRecordPosition = false;
-    private boolean emitDelete = false;
-    private boolean sortOutput = false;
-    private boolean allowInflightInstants = false;
-
-    public Builder shouldUseRecordPosition(boolean shouldUseRecordPosition) {
-      this.shouldUseRecordPosition = shouldUseRecordPosition;
-      return this;
-    }
-
-    public Builder emitDeletes(boolean emitDelete) {
-      this.emitDelete = emitDelete;
-      return this;
-    }
-
-    public Builder sortOutputs(boolean sortOutput) {
-      this.sortOutput = sortOutput;
-      return this;
-    }
-
-    public Builder allowInflightInstants(boolean allowInflightInstants) {
-      this.allowInflightInstants = allowInflightInstants;
-      return this;
-    }
+  @Builder.Default
+  private final boolean inflightInstantsAllowed = false;
 
-    public ReaderParameters build() {
-      return new ReaderParameters(shouldUseRecordPosition, emitDelete, 
sortOutput, allowInflightInstants);
-    }
+  public boolean shouldUseRecordPosition() {
+    return shouldUseRecordPosition;
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
index 3e0609003ad3..e84921c080ae 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
@@ -62,15 +62,15 @@ class DefaultFileGroupRecordBufferLoader<T> extends 
LogScanningRecordBufferLoade
                                                                             
Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
     boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props, 
HoodieReaderConfig.MERGE_TYPE, 
true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
     Option<PartialUpdateMode> partialUpdateModeOpt = 
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
-    UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats, 
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback, props);
+    UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats, 
readerContext, readerParameters.isEmitDeletes(), fileGroupUpdateCallback, 
props);
     FileGroupRecordBuffer<T> recordBuffer;
     if (isSkipMerge) {
       recordBuffer = new UnmergedFileGroupRecordBuffer<>(
           readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateModeOpt, props, readStats);
-    } else if (readerParameters.sortOutputs()) {
+    } else if (readerParameters.isSortOutputs()) {
       recordBuffer = new SortedKeyBasedFileGroupRecordBuffer<>(
           readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
-    } else if (readerParameters.useRecordPosition() && 
inputSplit.getBaseFileOption().isPresent()) {
+    } else if (readerParameters.shouldUseRecordPosition() && 
inputSplit.getBaseFileOption().isPresent()) {
       recordBuffer = new PositionBasedFileGroupRecordBuffer<>(
           readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateModeOpt, inputSplit.getBaseFileOption().get().getCommitTime(), 
props,
           orderingFieldNames, updateProcessor);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
index b6638b1ecd04..1c4f6082e4f1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
@@ -48,7 +48,7 @@ abstract class LogScanningRecordBufferLoader {
         .withInstantRange(readerContext.getInstantRange())
         .withPartition(inputSplit.getPartitionPath())
         .withRecordBuffer(recordBuffer)
-        .withAllowInflightInstants(readerParameters.allowInflightInstants())
+        
.withAllowInflightInstants(readerParameters.isInflightInstantsAllowed())
         .withMetaClient(hoodieTableMetaClient)
         .build()) {
       
readStats.setTotalLogReadTimeMs(logRecordReader.getTotalTimeTakenToReadAndMergeBlocks());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
index aa3fbd22cf94..bd3b3ec55bb3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
@@ -58,7 +58,7 @@ public class ReusableFileGroupRecordBufferLoader<T> extends 
LogScanningRecordBuf
                                                                                
          ReaderParameters readerParameters,
                                                                                
          HoodieReadStats readStats,
                                                                                
          Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
-    UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats, 
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback, props);
+    UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats, 
readerContext, readerParameters.isEmitDeletes(), fileGroupUpdateCallback, 
props);
     Option<PartialUpdateMode> partialUpdateModeOpt = 
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
     if (cachedResults == null) {
       // Create an initial buffer to process the log files
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
index 02c1d5b4cb9c..2087b7dceaf8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
@@ -67,9 +67,9 @@ public class StreamingFileGroupRecordBufferLoader<T> 
implements FileGroupRecordB
     readerContext.getSchemaHandler().setSchemaForUpdates(recordSchema);
     HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
     Option<PartialUpdateMode> partialUpdateModeOpt = 
tableConfig.getPartialUpdateMode();
-    UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats, 
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback, props);
+    UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats, 
readerContext, readerParameters.isEmitDeletes(), fileGroupUpdateCallback, 
props);
     FileGroupRecordBuffer<T> recordBuffer;
-    if (readerParameters.sortOutputs()) {
+    if (readerParameters.isSortOutputs()) {
       recordBuffer = new SortedKeyBasedFileGroupRecordBuffer<>(
           readerContext, hoodieTableMetaClient, readerContext.getMergeMode(), 
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
     } else {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 39db5be71dcc..a7e53e226033 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -601,11 +601,13 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         baseFileReaders,
         fileGroupReaderProps);
 
-    HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+    HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>builder()
         .withReaderContext(readerContext)
         .withHoodieTableMetaClient(metadataMetaClient)
         .withLatestCommitTime(latestMetadataInstantTime)
-        .withFileSlice(fileSlice)
+        .withBaseFileOption(fileSlice.getBaseFile())
+        .withLogFiles(fileSlice.getLogFiles())
+        .withPartitionPath(fileSlice.getPartitionPath())
         .withDataSchema(SCHEMA)
         .withRequestedSchema(SCHEMA)
         .withProps(fileGroupReaderProps)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 59e2d6dcd83f..0ede3ae5015e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1809,7 +1809,7 @@ public class HoodieTableMetadataUtil {
       properties.setProperty(HoodieReaderConfig.MERGE_TYPE.key(), 
REALTIME_SKIP_MERGE);
       // Currently only avro is fully supported for extracting column ranges 
(see HUDI-8585)
       HoodieReaderContext readerContext = new 
HoodieAvroReaderContext(datasetMetaClient.getStorageConf(), 
datasetMetaClient.getTableConfig(), Option.empty(), Option.empty());
-      HoodieFileGroupReader fileGroupReader = 
HoodieFileGroupReader.newBuilder()
+      HoodieFileGroupReader fileGroupReader = HoodieFileGroupReader.builder()
           .withReaderContext(readerContext)
           .withHoodieTableMetaClient(datasetMetaClient)
           .withLogFiles(Stream.of(logFile))
@@ -2568,10 +2568,12 @@ public class HoodieTableMetadataUtil {
       final String partition = partitionAndBaseFile.getKey();
       final FileSlice fileSlice = partitionAndBaseFile.getValue();
       if (!fileSlice.getBaseFile().isPresent()) {
-        HoodieFileGroupReader fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder()
+        HoodieFileGroupReader fileGroupReader = 
HoodieFileGroupReader.<T>builder()
             .withReaderContext(readerContextFactory.getContext())
             .withHoodieTableMetaClient(metaClient)
-            .withFileSlice(fileSlice)
+            .withBaseFileOption(fileSlice.getBaseFile())
+            .withLogFiles(fileSlice.getLogFiles())
+            .withPartitionPath(fileSlice.getPartitionPath())
             .withDataSchema(tableSchema)
             .withRequestedSchema(HoodieSchemaUtils.getRecordKeySchema())
             .withLatestCommitTime(latestCommitTime)
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index ffcda26a3796..158ef475d843 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -1040,15 +1040,17 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
                                                             HoodieSchema 
schema,
                                                             FileSlice 
fileSlice,
                                                             int start, 
TypedProperties props, boolean sortOutput) {
-    return HoodieFileGroupReader.<T>newBuilder()
+    return HoodieFileGroupReader.<T>builder()
         .withReaderContext(getHoodieReaderContext(tablePath, schema, 
storageConf, metaClient))
         .withHoodieTableMetaClient(metaClient)
         
.withLatestCommitTime(metaClient.getActiveTimeline().lastInstant().get().requestedTime())
-        .withFileSlice(fileSlice)
+        .withBaseFileOption(fileSlice.getBaseFile())
+        .withLogFiles(fileSlice.getLogFiles())
+        .withPartitionPath(fileSlice.getPartitionPath())
         .withDataSchema(schema)
         .withRequestedSchema(schema)
         .withProps(props)
-        .withStart(start)
+        .withStart((long) start)
         .withLength(fileSlice.getTotalFileSize())
         .withShouldUseRecordPosition(false)
         .withAllowInflightInstants(false)
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
index e6ee09b3e6ad..b2869df3310d 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
@@ -149,7 +149,7 @@ public class BaseTestFileGroupRecordBuffer {
       when(inputSplit.hasNoRecordsToMerge()).thenReturn(false);
       
when(inputSplit.getRecordIterator()).thenReturn(fileGroupRecordBufferItrOpt.get());
       ReaderParameters readerParameters = mock(ReaderParameters.class);
-      when(readerParameters.sortOutputs()).thenReturn(false);
+      when(readerParameters.isSortOutputs()).thenReturn(false);
       return (KeyBasedFileGroupRecordBuffer<IndexedRecord>) 
recordBufferLoader.getRecordBuffer(readerContext, mockMetaClient.getStorage(), 
inputSplit,
           orderingFieldNames, mockMetaClient, props, readerParameters, 
readStats, Option.empty()).getKey();
     }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
index 51dad1c752af..e1c810c56910 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
@@ -90,12 +90,12 @@ public class TestFileGroupRecordBufferLoader extends 
BaseTestFileGroupRecordBuff
     }
     ReaderParameters readerParameters = mock(ReaderParameters.class);
     if (fileGroupRecordBufferType.contains("Sorted")) {
-      when(readerParameters.sortOutputs()).thenReturn(true);
+      when(readerParameters.isSortOutputs()).thenReturn(true);
     }
     if (fileGroupRecordBufferType.contains("Position")) {
       HoodieBaseFile baseFile = mock(HoodieBaseFile.class);
       when(inputSplit.getBaseFileOption()).thenReturn(Option.of(baseFile));
-      when(readerParameters.useRecordPosition()).thenReturn(true);
+      when(readerParameters.shouldUseRecordPosition()).thenReturn(true);
     }
 
     Option<BaseFileUpdateCallback> fileGroupUpdateCallback = Option.empty();
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
index 0d43c1d463ca..611128a96309 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
@@ -141,7 +141,7 @@ class TestSortedKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuf
     when(inputSplit.hasNoRecordsToMerge()).thenReturn(false);
     when(inputSplit.getRecordIterator()).thenReturn(inputRecords.iterator());
     ReaderParameters readerParameters = mock(ReaderParameters.class);
-    when(readerParameters.sortOutputs()).thenReturn(true);
+    when(readerParameters.isSortOutputs()).thenReturn(true);
     SortedKeyBasedFileGroupRecordBuffer fileGroupRecordBuffer  = 
(SortedKeyBasedFileGroupRecordBuffer<IndexedRecord>) recordBufferLoader
         .getRecordBuffer(readerContext, mockMetaClient.getStorage(), 
inputSplit, Collections.singletonList("ts"), mockMetaClient, properties,
             readerParameters, readStats, Option.empty()).getKey();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index d62763ef64af..0bd7f53b611f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -132,14 +132,16 @@ public class FormatUtils {
     final TypedProperties typedProps = 
FlinkClientUtil.getReadProps(metaClient.getTableConfig(), writeConfig);
     typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
 
-    return HoodieFileGroupReader.<RowData>newBuilder()
+    return HoodieFileGroupReader.<RowData>builder()
         .withReaderContext(readerContext)
         .withHoodieTableMetaClient(metaClient)
         .withLatestCommitTime(latestInstant)
-        .withFileSlice(fileSlice)
+        .withBaseFileOption(fileSlice.getBaseFile())
+        .withLogFiles(fileSlice.getLogFiles())
+        .withPartitionPath(fileSlice.getPartitionPath())
         .withDataSchema(tableSchema)
         .withRequestedSchema(requiredSchema)
-        
.withInternalSchema(Option.ofNullable(internalSchemaManager.getQuerySchema()))
+        
.withInternalSchemaOpt(Option.ofNullable(internalSchemaManager.getQuerySchema()))
         .withProps(typedProps)
         .withShouldUseRecordPosition(false)
         .withEmitDelete(emitDelete)
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
index bead23772f80..1ec6d41d5b87 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
@@ -137,11 +137,14 @@ public class HoodieFileGroupReaderTestHarness extends 
HoodieCommonTestHarness {
     properties.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),  
basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME);
     properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), 
ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
     
properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 "false");
-    HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+    FileSlice fileSlice = fileSliceOpt.orElseThrow(() -> new 
IllegalArgumentException("FileSlice is not present"));
+    HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>builder()
         .withReaderContext(readerContext)
         .withHoodieTableMetaClient(metaClient)
         .withLatestCommitTime("1000") // Not used internally.
-        .withFileSlice(fileSliceOpt.orElseThrow(() -> new 
IllegalArgumentException("FileSlice is not present")))
+        .withBaseFileOption(fileSlice.getBaseFile())
+        .withLogFiles(fileSlice.getLogFiles())
+        .withPartitionPath(fileSlice.getPartitionPath())
         .withDataSchema(HOODIE_SCHEMA)
         .withRequestedSchema(HOODIE_SCHEMA)
         .withProps(properties)
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
index 1fbb6ad4a982..22d5ba701711 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
@@ -145,11 +145,13 @@ public class HoodieFileGroupReaderBasedRecordReader 
implements RecordReader<Null
     LOG.debug("Creating HoodieFileGroupReaderRecordReader with 
tableBasePath={}, latestCommitTime={}, fileSplit={}", tableBasePath, 
latestCommitTime, fileSplit.getPath());
     FileSlice fileSlice = getFileSliceFromSplit(fileSplit, 
getFs(tableBasePath, jobConfCopy), tableBasePath);
     this.containsBaseFile = fileSlice.getBaseFile().isPresent();
-    this.recordIterator = HoodieFileGroupReader.<ArrayWritable>newBuilder()
+    this.recordIterator = HoodieFileGroupReader.<ArrayWritable>builder()
         .withReaderContext(readerContext)
         .withHoodieTableMetaClient(metaClient)
         .withLatestCommitTime(latestCommitTime)
-        .withFileSlice(fileSlice)
+        .withBaseFileOption(fileSlice.getBaseFile())
+        .withLogFiles(fileSlice.getLogFiles())
+        .withPartitionPath(fileSlice.getPartitionPath())
         .withDataSchema(tableSchema)
         .withRequestedSchema(requestedSchema)
         .withProps(props)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
index 0104136f8424..0bb4f42931e8 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
@@ -172,7 +172,7 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
           val requestedSchema = requiredSchema.schema
           val instantRange = 
InstantRange.builder().rangeType(RangeType.EXACT_MATCH).explicitInstants(validInstants.value).build()
           val readerContext = new HoodieAvroReaderContext(storageConf, 
metaClient.getTableConfig, HOption.of(instantRange), 
HOption.empty().asInstanceOf[HOption[HPredicate]])
-          val fileGroupReader: HoodieFileGroupReader[IndexedRecord] = 
HoodieFileGroupReader.newBuilder()
+          val fileGroupReader: HoodieFileGroupReader[IndexedRecord] = 
HoodieFileGroupReader.builder()
             .withReaderContext(readerContext)
             .withHoodieTableMetaClient(metaClient)
             .withLatestCommitTime(tableState.latestCommitTimestamp.orNull)
@@ -182,13 +182,13 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
             .withProps(properties)
             .withDataSchema(tableSchema.schema)
             .withRequestedSchema(requestedSchema)
-            
.withInternalSchema(HOption.ofNullable(tableSchema.internalSchema.orNull))
+            
.withInternalSchemaOpt(HOption.ofNullable(tableSchema.internalSchema.orNull))
             .build()
           convertAvroToRowIterator(fileGroupReader.getClosableIterator, 
requestedSchema)
         } else {
           val readerContext = new 
SparkFileFormatInternalRowReaderContext(fileGroupBaseFileReader.value, 
optionalFilters,
             Seq.empty, storageConf, metaClient.getTableConfig)
-          val fileGroupReader = HoodieFileGroupReader.newBuilder()
+          val fileGroupReader = HoodieFileGroupReader.builder()
             .withReaderContext(readerContext)
             .withHoodieTableMetaClient(metaClient)
             .withLatestCommitTime(tableState.latestCommitTimestamp.orNull)
@@ -198,7 +198,7 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
             .withProps(properties)
             .withDataSchema(tableSchema.schema)
             .withRequestedSchema(requiredSchema.schema)
-            
.withInternalSchema(HOption.ofNullable(tableSchema.internalSchema.orNull))
+            
.withInternalSchemaOpt(HOption.ofNullable(tableSchema.internalSchema.orNull))
             .build()
           convertCloseableIterator(fileGroupReader.getClosableIterator)
         }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 7a97367bb3c5..4970be3c249b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -513,13 +513,15 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
   }
 
   private def loadFileSlice(fileSlice: FileSlice, readerContext: 
SparkFileFormatInternalRowReaderContext): Iterator[BufferedRecord[InternalRow]] 
= {
-    val fileGroupReader = HoodieFileGroupReader.newBuilder()
+    val fileGroupReader = HoodieFileGroupReader.builder()
       .withReaderContext(readerContext)
       .withHoodieTableMetaClient(metaClient)
-      .withFileSlice(fileSlice)
+      .withBaseFileOption(fileSlice.getBaseFile)
+      .withLogFiles(fileSlice.getLogFiles)
+      .withPartitionPath(fileSlice.getPartitionPath)
       .withDataSchema(schema)
       .withRequestedSchema(schema)
-      .withInternalSchema(toJavaOption(originTableSchema.internalSchema))
+      .withInternalSchemaOpt(toJavaOption(originTableSchema.internalSchema))
       .withProps(readerProperties)
       .withLatestCommitTime(split.changes.last.getInstant)
       .build()
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index de4ffb400d4c..da5bef9f785c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -313,14 +313,16 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
               } else {
                 0
               }
-              val reader = HoodieFileGroupReader.newBuilder()
+              val reader = HoodieFileGroupReader.builder()
                 .withReaderContext(readerContext)
                 .withHoodieTableMetaClient(metaClient)
                 .withLatestCommitTime(queryTimestamp)
-                .withFileSlice(fileSlice)
+                .withBaseFileOption(fileSlice.getBaseFile)
+                .withLogFiles(fileSlice.getLogFiles)
+                .withPartitionPath(fileSlice.getPartitionPath)
                 .withDataSchema(dataSchema)
                 .withRequestedSchema(requestedSchema)
-                .withInternalSchema(internalSchemaOpt)
+                .withInternalSchemaOpt(internalSchemaOpt)
                 .withProps(props)
                 .withStart(file.start)
                 .withLength(baseFileLength)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
index 6eaade123a64..4e044ecb6786 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
@@ -225,14 +225,16 @@ class PartitionBucketIndexManager extends BaseProcedure
           // instantiate other supporting cast
           val internalSchemaOption: Option[InternalSchema] = Option.empty()
           // instantiate FG reader
-          val fileGroupReader = HoodieFileGroupReader.newBuilder()
+          val fileGroupReader = HoodieFileGroupReader.builder()
             .withReaderContext(readerContextFactory.getContext)
             .withHoodieTableMetaClient(metaClient)
             .withLatestCommitTime(latestInstantTime.requestedTime())
-            .withFileSlice(fileSlice)
+            .withBaseFileOption(fileSlice.getBaseFile)
+            .withLogFiles(fileSlice.getLogFiles)
+            .withPartitionPath(fileSlice.getPartitionPath)
             .withDataSchema(tableSchemaWithMetaFields)
             .withRequestedSchema(tableSchemaWithMetaFields)
-            .withInternalSchema(internalSchemaOption) // not support evolution 
of schema for now
+            .withInternalSchemaOpt(internalSchemaOption) // not support 
evolution of schema for now
             .withProps(metaClient.getTableConfig.getProps)
             .withShouldUseRecordPosition(false)
             .build()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
index b35da0450cfd..7dbd7e7aaf96 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
@@ -712,7 +712,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration 
extends HoodieClientTestBa
       TypedProperties properties = new TypedProperties();
       // configure un-merged log file reader
       HoodieReaderContext readerContext = 
context.getReaderContextFactory(metaClient).getContext();
-      HoodieFileGroupReader reader = HoodieFileGroupReader.newBuilder()
+      HoodieFileGroupReader reader = HoodieFileGroupReader.builder()
           .withReaderContext(readerContext)
           .withDataSchema(writerSchemaOpt.get())
           .withRequestedSchema(writerSchemaOpt.get())
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index b3beca5fa18c..b0a4aab2e789 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -1664,7 +1664,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
                                            String latestCommitTimestamp) {
     HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(HoodieSchema.fromAvroSchema(HoodieMetadataRecord.getClassSchema()));
     HoodieAvroReaderContext readerContext = new 
HoodieAvroReaderContext(metadataMetaClient.getStorageConf(), 
metadataMetaClient.getTableConfig(), Option.empty(), Option.empty());
-    HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+    HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>builder()
         .withReaderContext(readerContext)
         .withHoodieTableMetaClient(metadataMetaClient)
         .withLogFiles(logFiles.stream())

Reply via email to