This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 89631a812eb [HUDI-8896] FileGroupReader Bootstrap support (#13223)
89631a812eb is described below

commit 89631a812ebc80dd5603bc525e069479f4568a1d
Author: Tim Brown <[email protected]>
AuthorDate: Tue May 6 21:54:44 2025 -0500

    [HUDI-8896] FileGroupReader Bootstrap support (#13223)
---
 .../apache/hudi/client/model/BootstrapRowData.java | 150 ++++++++++++++++++
 .../read/HoodieFileGroupReaderOnJavaTestBase.java  |   2 +-
 .../hadoop/TestHoodieFileGroupReaderOnHive.java    |  15 +-
 .../hudi/common/model/HoodieSparkRecord.java       |   3 +-
 .../hudi/BaseSparkInternalRowReaderContext.java    |  22 ++-
 .../SparkFileFormatInternalRowReaderContext.scala  |  20 ++-
 .../apache/spark/sql/HoodieInternalRowUtils.scala  |  68 +++++---
 .../apache/hudi/avro/HoodieAvroReaderContext.java  |  18 ++-
 .../hudi/common/engine/HoodieReaderContext.java    |  12 +-
 .../hudi/common/table/PartitionPathParser.java     | 176 +++++++++++++++++++++
 .../common/table/read/HoodieFileGroupReader.java   |  29 +++-
 .../hudi/avro/TestHoodieAvroReaderContext.java     |   8 +-
 .../hudi/common/table/TestPartitionPathParser.java |  94 +++++++++++
 .../table/read/TestHoodieFileGroupReaderBase.java  | 114 ++++++++++---
 .../common/testutils/HoodieTestDataGenerator.java  |  24 ++-
 .../resources/file-group-reader/bootstrap_data.zip | Bin 0 -> 157171 bytes
 .../table/format/FlinkRowDataReaderContext.java    |  18 ++-
 .../hudi/hadoop/HiveHoodieReaderContext.java       |  12 +-
 .../hudi/hadoop/utils/ObjectInspectorCache.java    |   2 +-
 .../hudi/common/TestHoodieInternalRowUtils.scala   |  32 +++-
 pom.xml                                            |   1 +
 21 files changed, 737 insertions(+), 83 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
new file mode 100644
index 00000000000..513b8b1994b
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * RowData implementation used when reading from Spark bootstrapped table. In 
these tables, the partition values
+ * are not always written to the data files, so we need to use the values 
inferred from the file's partition path.
+ */
+public class BootstrapRowData implements RowData {
+  private final RowData row;
+  private final Map<Integer, Object> partitionOrdinalToValues;
+
+  public BootstrapRowData(RowData row, Map<Integer, Object> 
partitionOrdinalToValues) {
+    this.row = row;
+    this.partitionOrdinalToValues = partitionOrdinalToValues;
+  }
+
+  @Override
+  public int getArity() {
+    return row.getArity();
+  }
+
+  @Override
+  public RowKind getRowKind() {
+    return row.getRowKind();
+  }
+
+  @Override
+  public void setRowKind(RowKind kind) {
+    row.setRowKind(kind);
+  }
+
+  @Override
+  public boolean isNullAt(int pos) {
+    return !partitionOrdinalToValues.containsKey(pos) || row.isNullAt(pos);
+  }
+
+  @Override
+  public boolean getBoolean(int pos) {
+    return getValue(pos, row::getBoolean);
+  }
+
+  @Override
+  public byte getByte(int pos) {
+    return getValue(pos, row::getByte);
+  }
+
+  @Override
+  public short getShort(int pos) {
+    return getValue(pos, row::getShort);
+  }
+
+  @Override
+  public int getInt(int pos) {
+    return getValue(pos, row::getInt);
+  }
+
+  @Override
+  public long getLong(int pos) {
+    return getValue(pos, row::getLong);
+  }
+
+  @Override
+  public float getFloat(int pos) {
+    return getValue(pos, row::getFloat);
+  }
+
+  @Override
+  public double getDouble(int pos) {
+    return getValue(pos, row::getDouble);
+  }
+
+  @Override
+  public StringData getString(int pos) {
+    return getValue(pos, row::getString);
+  }
+
+  @Override
+  public DecimalData getDecimal(int pos, int precision, int scale) {
+    return getValue(pos, (p) -> row.getDecimal(p, precision, scale));
+  }
+
+  @Override
+  public TimestampData getTimestamp(int pos, int precision) {
+    return getValue(pos, (p) -> row.getTimestamp(p, precision));
+  }
+
+  @Override
+  public <T> RawValueData<T> getRawValue(int pos) {
+    return getValue(pos, row::getRawValue);
+  }
+
+  @Override
+  public byte[] getBinary(int pos) {
+    return getValue(pos, row::getBinary);
+  }
+
+  @Override
+  public ArrayData getArray(int pos) {
+    // bootstrap partition values cannot be arrays
+    return row.getArray(pos);
+  }
+
+  @Override
+  public MapData getMap(int pos) {
+    // bootstrap partition values cannot be maps
+    return row.getMap(pos);
+  }
+
+  @Override
+  public RowData getRow(int pos, int numFields) {
+    // bootstrap partition values cannot be rows
+    return row.getRow(pos, numFields);
+  }
+
+  private <T> T getValue(int pos, Function<Integer, T> getter) {
+    if (row.isNullAt(pos) && partitionOrdinalToValues.containsKey(pos)) {
+      return (T) partitionOrdinalToValues.get(pos);
+    }
+    return getter.apply(pos);
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
index 2eedb63b4a9..1b6c7b83fb4 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
@@ -48,7 +48,7 @@ public abstract class HoodieFileGroupReaderOnJavaTestBase<T> 
extends TestHoodieF
 
   @Override
   public String getBasePath() {
-    return tempDir.toAbsolutePath() + "/myTable";
+    return "file://" + tempDir.toAbsolutePath() + "/myTable";
   }
 
   @Override
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
index ffb194696c0..c01ed5a7a33 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
@@ -19,7 +19,6 @@
 
 package org.apache.hudi.hadoop;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -39,6 +38,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.junit.jupiter.api.AfterAll;
@@ -46,6 +47,7 @@ import org.junit.jupiter.api.BeforeAll;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.hadoop.HoodieFileGroupReaderBasedRecordReader.getStoredPartitionFieldNames;
@@ -92,7 +94,7 @@ public class TestHoodieFileGroupReaderOnHive extends 
HoodieFileGroupReaderOnJava
   public HoodieReaderContext<ArrayWritable> getHoodieReaderContext(String 
tablePath, Schema avroSchema, StorageConfiguration<?> storageConf, 
HoodieTableMetaClient metaClient) {
     HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator = 
(inputSplit, jobConf) -> new 
MapredParquetInputFormat().getRecordReader(inputSplit, jobConf, null);
     JobConf jobConf = new JobConf(storageConf.unwrapAs(Configuration.class));
-    setupJobconf(jobConf, metaClient.getTableConfig().populateMetaFields());
+    setupJobconf(jobConf, avroSchema);
     return new HiveHoodieReaderContext(readerCreator,
         getStoredPartitionFieldNames(new 
JobConf(storageConf.unwrapAs(Configuration.class)), avroSchema),
         new ObjectInspectorCache(avroSchema, jobConf), storageConf, 
metaClient.getTableConfig());
@@ -103,12 +105,13 @@ public class TestHoodieFileGroupReaderOnHive extends 
HoodieFileGroupReaderOnJava
     ArrayWritableTestUtil.assertArrayWritableEqual(schema, expected, actual, 
false);
   }
 
-  private void setupJobconf(JobConf jobConf, boolean populateMetaFields) {
-    Schema schema = populateMetaFields ? 
HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA) : 
HoodieTestDataGenerator.AVRO_SCHEMA;
+  private void setupJobconf(JobConf jobConf, Schema schema) {
     List<Schema.Field> fields = schema.getFields();
     setHiveColumnNameProps(fields, jobConf, USE_FAKE_PARTITION);
-    String metaFieldTypes = "string,string,string,string,string,";
-    jobConf.set("columns.types", (populateMetaFields ? metaFieldTypes : "") + 
HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES + ",string");
+    List<TypeInfo> types = 
TypeInfoUtils.getTypeInfosFromTypeString(HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES);
+    Map<String, String> typeMappings = 
HoodieTestDataGenerator.AVRO_SCHEMA.getFields().stream().collect(Collectors.toMap(Schema.Field::name,
 field -> types.get(field.pos()).getTypeName()));
+    String columnTypes = fields.stream().map(field -> 
typeMappings.getOrDefault(field.name(), 
"string")).collect(Collectors.joining(","));
+    jobConf.set("columns.types", columnTypes + ",string");
   }
 
   private void setHiveColumnNameProps(List<Schema.Field> fields, JobConf 
jobConf, boolean isPartitioned) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index d94888fd83b..c810e7628cd 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -49,6 +49,7 @@ import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 
@@ -217,7 +218,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
     StructType newStructType = 
HoodieInternalRowUtils.getCachedSchema(newSchema);
 
     Function1<InternalRow, UnsafeRow> unsafeRowWriter =
-        HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, 
newStructType, renameCols);
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, 
newStructType, renameCols, Collections.emptyMap());
 
     UnsafeRow unsafeRow = unsafeRowWriter.apply(this.data);
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 9ed37e1e04a..dc2ad7f0b15 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.avro.Schema;
@@ -43,8 +44,11 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
 
 import scala.Function1;
 
@@ -132,13 +136,23 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
   @Override
   public UnaryOperator<InternalRow> projectRecord(Schema from, Schema to, 
Map<String, String> renamedColumns) {
     Function1<InternalRow, UnsafeRow> unsafeRowWriter =
-        HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from), 
getCachedSchema(to), renamedColumns);
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from), 
getCachedSchema(to), renamedColumns, Collections.emptyMap());
     return row -> (InternalRow) unsafeRowWriter.apply(row);
-
   }
 
-  protected UnaryOperator<InternalRow> getIdentityProjection() {
-    return row -> row;
+  /**
+   * Constructs a transformation that will take a row and convert it to a new 
row with the given schema and adds in the values for the partition columns if 
they are missing in the returned row.
+   * It is assumed that the `to` schema will contain the partition fields.
+   * @param from the original schema
+   * @param to the schema the row will be converted to
+   * @param partitionFieldAndValues the partition fields and their values, if 
any are required by the reader
+   * @return a function for transforming the row
+   */
+  protected UnaryOperator<InternalRow> getBootstrapProjection(Schema from, 
Schema to, List<Pair<String, Object>> partitionFieldAndValues) {
+    Map<Integer, Object> partitionValuesByIndex = 
partitionFieldAndValues.stream().collect(Collectors.toMap(pair -> 
to.getField(pair.getKey()).pos(), Pair::getRight));
+    Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from), 
getCachedSchema(to), Collections.emptyMap(), partitionValuesByIndex);
+    return row -> (InternalRow) unsafeRowWriter.apply(row);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 1c595550405..ec9efab469a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -27,10 +27,11 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
 import 
org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
 import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.common.util.collection.{CachingIterator, 
ClosableIterator}
+import org.apache.hudi.common.util.collection.{CachingIterator, 
ClosableIterator, Pair => HPair}
 import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, 
HoodieSparkParquetReader}
 import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, 
StoragePath}
 import org.apache.hudi.util.CloseableInternalRowIterator
+
 import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.hadoop.conf.Configuration
@@ -147,15 +148,17 @@ class 
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
   override def mergeBootstrapReaders(skeletonFileIterator: 
ClosableIterator[InternalRow],
                                      skeletonRequiredSchema: Schema,
                                      dataFileIterator: 
ClosableIterator[InternalRow],
-                                     dataRequiredSchema: Schema): 
ClosableIterator[InternalRow] = {
+                                     dataRequiredSchema: Schema,
+                                     partitionFieldAndValues: 
java.util.List[HPair[String, Object]]): ClosableIterator[InternalRow] = {
     doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], 
skeletonRequiredSchema,
-      dataFileIterator.asInstanceOf[ClosableIterator[Any]], dataRequiredSchema)
+      dataFileIterator.asInstanceOf[ClosableIterator[Any]], 
dataRequiredSchema, partitionFieldAndValues)
   }
 
   private def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any],
                                skeletonRequiredSchema: Schema,
                                dataFileIterator: ClosableIterator[Any],
-                               dataRequiredSchema: Schema): 
ClosableIterator[InternalRow] = {
+                               dataRequiredSchema: Schema,
+                               partitionFieldAndValues: 
java.util.List[HPair[String, Object]]): ClosableIterator[InternalRow] = {
     if (supportsParquetRowIndex()) {
       assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME))
       assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema, 
ROW_INDEX_TEMPORARY_COLUMN_NAME))
@@ -167,10 +170,10 @@ class 
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
 
       //If we need to do position based merging with log files we will leave 
the row index column at the end
       val dataProjection = if (getHasLogFiles && 
getShouldMergeUseRecordPosition) {
-        getIdentityProjection
+        getBootstrapProjection(dataRequiredSchema, dataRequiredSchema, 
partitionFieldAndValues)
       } else {
-        projectRecord(dataRequiredSchema,
-          HoodieAvroUtils.removeFields(dataRequiredSchema, rowIndexColumn))
+        getBootstrapProjection(dataRequiredSchema,
+          HoodieAvroUtils.removeFields(dataRequiredSchema, rowIndexColumn), 
partitionFieldAndValues)
       }
 
       //row index will always be the last column
@@ -224,6 +227,7 @@ class 
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
         }
       }
     } else {
+      val dataProjection = getBootstrapProjection(dataRequiredSchema, 
dataRequiredSchema, partitionFieldAndValues)
       new ClosableIterator[Any] {
         val combinedRow = new JoinedRow()
 
@@ -251,7 +255,7 @@ class 
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
               sparkAdapter.makeColumnarBatch(vecs, s.numRows())
             case (_: ColumnarBatch, _: InternalRow) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
             case (_: InternalRow, _: ColumnarBatch) => throw new 
IllegalStateException("InternalRow ColumnVector mismatch")
-            case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+            case (s: InternalRow, d: InternalRow) => combinedRow(s, 
dataProjection.apply(d))
           }
         }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index 519519fea37..fda5f4b0a2b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -44,13 +44,14 @@ import scala.collection.mutable.ArrayBuffer
 object HoodieInternalRowUtils {
 
   private type RenamedColumnMap = JMap[String, String]
+  private type UpdatedColumnMap = JMap[Integer, Object]
   private type UnsafeRowWriter = InternalRow => UnsafeRow
 
   // NOTE: [[UnsafeProjection]] objects cache have to stay [[ThreadLocal]] 
since these are not thread-safe
-  private val unsafeWriterThreadLocal: 
ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap), 
UnsafeRowWriter]] =
-    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, 
StructType, RenamedColumnMap), UnsafeRowWriter]] {
-      override def get(): mutable.HashMap[(StructType, StructType, 
RenamedColumnMap), UnsafeRowWriter] =
-        new mutable.HashMap[(StructType, StructType, RenamedColumnMap), 
UnsafeRowWriter]
+  private val unsafeWriterThreadLocal: 
ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap, 
UpdatedColumnMap), UnsafeRowWriter]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, 
StructType, RenamedColumnMap, UpdatedColumnMap), UnsafeRowWriter]] {
+      override def get(): mutable.HashMap[(StructType, StructType, 
RenamedColumnMap, UpdatedColumnMap), UnsafeRowWriter] =
+        new mutable.HashMap[(StructType, StructType, RenamedColumnMap, 
UpdatedColumnMap), UnsafeRowWriter]
     })
 
   // NOTE: [[UnsafeRowWriter]] objects cache have to stay [[ThreadLocal]] 
since these are not thread-safe
@@ -102,9 +103,11 @@ object HoodieInternalRowUtils {
    *   <li>Handling (field) renames</li>
    * </ul>
    */
-  def getCachedUnsafeRowWriter(from: StructType, to: StructType, 
renamedColumnsMap: JMap[String, String] = JCollections.emptyMap()): 
UnsafeRowWriter = {
+  def getCachedUnsafeRowWriter(from: StructType, to: StructType,
+                               renamedColumnsMap: JMap[String, String] = 
JCollections.emptyMap(),
+                               updatedValuesMap: JMap[Integer, Object] = 
JCollections.emptyMap()): UnsafeRowWriter = {
     unsafeWriterThreadLocal.get()
-      .getOrElseUpdate((from, to, renamedColumnsMap), genUnsafeRowWriter(from, 
to, renamedColumnsMap))
+      .getOrElseUpdate((from, to, renamedColumnsMap, updatedValuesMap), 
genUnsafeRowWriter(from, to, renamedColumnsMap, updatedValuesMap))
   }
 
   def getCachedPosList(structType: StructType, field: String): 
Option[NestedFieldPath] = {
@@ -142,9 +145,18 @@ object HoodieInternalRowUtils {
 
   private[sql] def genUnsafeRowWriter(prevSchema: StructType,
                                       newSchema: StructType,
-                                      renamedColumnsMap: JMap[String, 
String]): UnsafeRowWriter = {
-    val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap, 
new JArrayDeque[String]())
+                                      renamedColumnsMap: JMap[String, String],
+                                      updatedValuesMap: JMap[Integer, 
Object]): UnsafeRowWriter = {
     val unsafeProjection = generateUnsafeProjection(newSchema, newSchema)
+    if (prevSchema.equals(newSchema) && renamedColumnsMap.isEmpty && 
updatedValuesMap.isEmpty) {
+      return oldRow => unsafeProjection(oldRow)
+    }
+    val writer = if (newSchema.equals(prevSchema)) {
+      // Force a modifiable row to be generated so the updated values can be 
set
+      modifiableRowWriter(prevSchema, newSchema, renamedColumnsMap, new 
JArrayDeque[String]())
+    } else {
+      newWriterRenaming(prevSchema, newSchema, renamedColumnsMap, new 
JArrayDeque[String]())
+    }
     val phonyUpdater = new CatalystDataUpdater {
       var value: InternalRow = _
 
@@ -154,6 +166,11 @@ object HoodieInternalRowUtils {
 
     oldRow => {
       writer(phonyUpdater, 0, oldRow)
+      updatedValuesMap.forEach((index, value) => {
+        if (phonyUpdater.value.isNullAt(index)) {
+          phonyUpdater.value.update(index, value)
+        }
+      })
       unsafeProjection(phonyUpdater.value)
     }
   }
@@ -218,6 +235,26 @@ object HoodieInternalRowUtils {
     }
   }
 
+  private def modifiableRowWriter(prevStructType: StructType,
+                                  newStructType: StructType,
+                                  renamedColumnsMap: JMap[String, String],
+                                  fieldNameStack: JDeque[String]): 
RowFieldUpdater = {
+    val writer = genUnsafeStructWriter(prevStructType, newStructType, 
renamedColumnsMap, fieldNameStack)
+
+    val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType))
+    val rowUpdater = new RowUpdater(newRow)
+
+    (fieldUpdater, ordinal, value) => {
+      // Here new row is built in 2 stages:
+      //    - First, we pass mutable row (used as buffer/scratchpad) created 
above wrapped into [[RowUpdater]]
+      //      into generated row-writer
+      //    - Upon returning from row-writer, we call back into parent row's 
[[fieldUpdater]] to set returned
+      //      row as a value in it
+      writer(rowUpdater, value)
+      fieldUpdater.set(ordinal, newRow)
+    }
+  }
+
   private def newWriterRenaming(prevDataType: DataType,
                                 newDataType: DataType,
                                 renamedColumnsMap: JMap[String, String],
@@ -227,20 +264,7 @@ object HoodieInternalRowUtils {
         (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
 
       case (newStructType: StructType, prevStructType: StructType) =>
-        val writer = genUnsafeStructWriter(prevStructType, newStructType, 
renamedColumnsMap, fieldNameStack)
-
-        val newRow = new 
SpecificInternalRow(newStructType.fields.map(_.dataType))
-        val rowUpdater = new RowUpdater(newRow)
-
-        (fieldUpdater, ordinal, value) => {
-          // Here new row is built in 2 stages:
-          //    - First, we pass mutable row (used as buffer/scratchpad) 
created above wrapped into [[RowUpdater]]
-          //      into generated row-writer
-          //    - Upon returning from row-writer, we call back into parent 
row's [[fieldUpdater]] to set returned
-          //      row as a value in it
-          writer(rowUpdater, value)
-          fieldUpdater.set(ordinal, newRow)
-        }
+        modifiableRowWriter(prevStructType, newStructType, renamedColumnsMap, 
fieldNameStack)
 
       case (ArrayType(newElementType, _), ArrayType(prevElementType, 
containsNull)) =>
         fieldNameStack.push("element")
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index 9ede9fc1239..d2f360e5ae1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieIOFactory;
@@ -148,8 +149,9 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
   public ClosableIterator<IndexedRecord> 
mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator,
                                                                Schema 
skeletonRequiredSchema,
                                                                
ClosableIterator<IndexedRecord> dataFileIterator,
-                                                               Schema 
dataRequiredSchema) {
-    return new BootstrapIterator(skeletonFileIterator, skeletonRequiredSchema, 
dataFileIterator, dataRequiredSchema);
+                                                               Schema 
dataRequiredSchema,
+                                                               
List<Pair<String, Object>> partitionFieldAndValues) {
+    return new BootstrapIterator(skeletonFileIterator, skeletonRequiredSchema, 
dataFileIterator, dataRequiredSchema, partitionFieldAndValues);
   }
 
   @Override
@@ -227,15 +229,20 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
     private final Schema dataRequiredSchema;
     private final Schema mergedSchema;
     private final int skeletonFields;
+    private final int[] partitionFieldPositions;
+    private final Object[] partitionValues;
 
     public BootstrapIterator(ClosableIterator<IndexedRecord> 
skeletonFileIterator, Schema skeletonRequiredSchema,
-                             ClosableIterator<IndexedRecord> dataFileIterator, 
Schema dataRequiredSchema) {
+                             ClosableIterator<IndexedRecord> dataFileIterator, 
Schema dataRequiredSchema,
+                             List<Pair<String, Object>> 
partitionFieldAndValues) {
       this.skeletonFileIterator = skeletonFileIterator;
       this.skeletonRequiredSchema = skeletonRequiredSchema;
       this.dataFileIterator = dataFileIterator;
       this.dataRequiredSchema = dataRequiredSchema;
       this.mergedSchema = AvroSchemaUtils.mergeSchemas(skeletonRequiredSchema, 
dataRequiredSchema);
       this.skeletonFields = skeletonRequiredSchema.getFields().size();
+      this.partitionFieldPositions = 
partitionFieldAndValues.stream().map(Pair::getLeft).map(field -> 
mergedSchema.getField(field).pos()).mapToInt(Integer::intValue).toArray();
+      this.partitionValues = 
partitionFieldAndValues.stream().map(Pair::getValue).toArray();
     }
 
     @Override
@@ -265,6 +272,11 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
         Schema.Field sourceField = 
dataRecord.getSchema().getField(dataField.name());
         mergedRecord.put(dataField.pos() + skeletonFields, 
dataRecord.get(sourceField.pos()));
       }
+      for (int i = 0; i < partitionFieldPositions.length; i++) {
+        if (mergedRecord.get(partitionFieldPositions[i]) == null) {
+          mergedRecord.put(partitionFieldPositions[i], partitionValues[i]);
+        }
+      }
       return mergedRecord;
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 98d6feeb0e7..c07b9b87e0c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,11 +23,12 @@ import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
 import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
 import org.apache.hudi.common.util.LocalAvroSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -42,6 +43,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.function.BiFunction;
 import java.util.function.UnaryOperator;
@@ -321,13 +323,17 @@ public abstract class HoodieReaderContext<T> {
    * skeleton file iterator, followed by all columns in the data file iterator
    *
    * @param skeletonFileIterator iterator over bootstrap skeleton files that 
contain hudi metadata columns
-   * @param dataFileIterator     iterator over data files that were 
bootstrapped into the hudi table
+   * @param skeletonRequiredSchema the schema of the skeleton file iterator
+   * @param dataFileIterator iterator over data files that were bootstrapped 
into the hudi table
+   * @param dataRequiredSchema the schema of the data file iterator
+   * @param requiredPartitionFieldAndValues the partition field names and 
their values that are required by the query
    * @return iterator that concatenates the skeletonFileIterator and 
dataFileIterator
    */
   public abstract ClosableIterator<T> 
mergeBootstrapReaders(ClosableIterator<T> skeletonFileIterator,
                                                             Schema 
skeletonRequiredSchema,
                                                             
ClosableIterator<T> dataFileIterator,
-                                                            Schema 
dataRequiredSchema);
+                                                            Schema 
dataRequiredSchema,
+                                                            List<Pair<String, 
Object>> requiredPartitionFieldAndValues);
 
   /**
    * Creates a function that will reorder records of schema "from" to schema 
of "to"
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/PartitionPathParser.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/PartitionPathParser.java
new file mode 100644
index 00000000000..3b846330e63
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/PartitionPathParser.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
+
+public class PartitionPathParser {
+  public static final String DEPRECATED_DEFAULT_PARTITION_PATH = "default";
+  public static final String DEFAULT_PARTITION_PATH = 
"__HIVE_DEFAULT_PARTITION__";
+  private static final String EQUALS_SIGN = "=";
+  private static final String DASH = "-";
+  private static final String SLASH = "/";
+
+  public Object[] getPartitionFieldVals(Option<String[]> partitionFields,
+                                        String partitionPath,
+                                        Schema writerSchema) {
+    if (!partitionFields.isPresent()) {
+      return new Object[0];
+    }
+    return getPartitionValues(partitionFields.get(), partitionPath, 
writerSchema);
+  }
+
+  private static Object[] getPartitionValues(String[] partitionFields,
+                                             String partitionPath,
+                                             Schema schema) {
+    String[] parts = partitionPath.split("/");
+    int pathSegment = 0;
+    boolean hasDateField = false;
+    Object[] partitionValues = new Object[partitionFields.length];
+    for (int i = 0; i < partitionFields.length; i++) {
+      String partitionField = partitionFields[i];
+      Schema.Field field = schema.getField(partitionField);
+      // if the field is not present in the schema, we assume it is a string
+      Schema fieldSchema = field == null ? Schema.create(Schema.Type.STRING) : 
resolveNullableSchema(field.schema());
+      LogicalType logicalType = fieldSchema.getLogicalType();
+      if (isTimeBasedLogicalType(logicalType)) {
+        if (hasDateField) {
+          throw new IllegalArgumentException("Only one date field based 
partition is supported");
+        }
+        hasDateField = true;
+        int numDateDirs = parts.length - partitionFields.length + 1;
+        partitionValues[i] = inferDateValue(partitionPath, parts, pathSegment, 
numDateDirs, fieldSchema);
+        pathSegment += numDateDirs;
+      } else {
+        String segment = parts[pathSegment];
+        String[] segmentParts = segment.split(EQUALS_SIGN);
+        partitionValues[i] = parseValue(segmentParts[segmentParts.length - 1], 
fieldSchema);
+        pathSegment++;
+      }
+    }
+    return partitionValues;
+  }
+
+  @VisibleForTesting
+  static Object parseValue(String partitionValue, Schema fieldSchema) {
+    if (partitionValue.equals(DEFAULT_PARTITION_PATH) || 
partitionValue.equals(DEPRECATED_DEFAULT_PARTITION_PATH)) {
+      return null;
+    }
+
+    switch (fieldSchema.getType()) {
+      case STRING:
+        return PartitionPathEncodeUtils.unescapePathName(partitionValue);
+      case INT:
+        return Integer.parseInt(partitionValue);
+      case LONG:
+        return Long.parseLong(partitionValue);
+      case FLOAT:
+        return Float.parseFloat(partitionValue);
+      case DOUBLE:
+        return Double.parseDouble(partitionValue);
+      case BOOLEAN:
+        return Boolean.parseBoolean(partitionValue);
+      case BYTES:
+      case FIXED:
+        if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          return new java.math.BigDecimal(partitionValue);
+        } else {
+          return partitionValue.getBytes(StandardCharsets.UTF_8);
+        }
+      default:
+        throw new IllegalArgumentException("Unexpected type " + 
fieldSchema.getType());
+    }
+  }
+
+  private static Object inferDateValue(
+      String partitionPath,
+      String[] parts,
+      int pathSegment,
+      int numDateDirs,
+      Schema fieldSchema) {
+    StringBuilder condensedPartitionValue = new StringBuilder();
+    for (int i = 0; i < numDateDirs; i++) {
+      String partitionValue = parts[pathSegment + i];
+      // remove the field name if it is present due to hive-style partitioning
+      if (partitionValue.contains(EQUALS_SIGN)) {
+        partitionValue = partitionValue.split(EQUALS_SIGN)[1];
+      }
+      if (partitionValue.contains(DASH)) {
+        partitionValue = partitionValue.replace(DASH, "");
+      }
+      condensedPartitionValue.append(partitionValue.replace(SLASH, ""));
+    }
+    LocalDateTime time;
+    switch (condensedPartitionValue.length()) {
+      case 4: // Year
+        time = 
LocalDateTime.of(Integer.parseInt(condensedPartitionValue.substring(0, 4)), 1, 
1, 0, 0);
+        break;
+      case 6: // Month
+        time = LocalDateTime.of(
+            Integer.parseInt(condensedPartitionValue.substring(0, 4)),
+            Integer.parseInt(condensedPartitionValue.substring(4, 6)), 1, 0, 
0);
+        break;
+      case 8: // Day
+        time = LocalDateTime.of(
+            Integer.parseInt(condensedPartitionValue.substring(0, 4)),
+            Integer.parseInt(condensedPartitionValue.substring(4, 6)),
+            Integer.parseInt(condensedPartitionValue.substring(6, 8)), 0, 0);
+        break;
+      case 10: // Hour
+        time = LocalDateTime.of(
+            Integer.parseInt(condensedPartitionValue.substring(0, 4)),
+            Integer.parseInt(condensedPartitionValue.substring(4, 6)),
+            Integer.parseInt(condensedPartitionValue.substring(6, 8)),
+            Integer.parseInt(condensedPartitionValue.substring(8, 10)), 0);
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Unknown date format for partition path: " + partitionPath);
+    }
+    if (fieldSchema.getLogicalType() instanceof LogicalTypes.Date) {
+      return Date.valueOf(time.toLocalDate());
+    }
+    return Timestamp.from(time.toInstant(ZoneOffset.UTC));
+  }
+
+  private static boolean isTimeBasedLogicalType(LogicalType logicalType) {
+    return logicalType instanceof LogicalTypes.Date
+        || logicalType instanceof LogicalTypes.TimestampMillis
+        || logicalType instanceof LogicalTypes.TimestampMicros
+        || logicalType instanceof LogicalTypes.TimeMillis
+        || logicalType instanceof LogicalTypes.TimeMicros
+        || logicalType instanceof LogicalTypes.LocalTimestampMicros
+        || logicalType instanceof LogicalTypes.LocalTimestampMillis;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 570d9580ca0..0c972c936f5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartitionPathParser;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
@@ -49,6 +50,8 @@ import org.apache.avro.Schema;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
@@ -71,6 +74,8 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   private final HoodieReaderContext<T> readerContext;
   private final Option<HoodieBaseFile> hoodieBaseFileOption;
   private final List<HoodieLogFile> logFiles;
+  private final String partitionPath;
+  private final Option<String[]> partitionPathFields;
   private final HoodieStorage storage;
   private final TypedProperties props;
   // Byte offset to start reading from the base file
@@ -112,6 +117,8 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     this.start = start;
     this.length = length;
     HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
+    this.partitionPath = fileSlice.getPartitionPath();
+    this.partitionPathFields = tableConfig.getPartitionFields();
     RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
     String mergeStrategyId = tableConfig.getRecordMergeStrategyId();
     if 
(!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
@@ -228,8 +235,22 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
       if (start != 0) {
         throw new IllegalArgumentException("Filegroup reader is doing 
bootstrap merge but we are not reading from the start of the base file");
       }
+      PartitionPathParser partitionPathParser = new PartitionPathParser();
+      Object[] partitionValues = 
partitionPathParser.getPartitionFieldVals(partitionPathFields, partitionPath, 
readerContext.getSchemaHandler().getTableSchema());
+      // filter out the partition values that are not required by the data 
schema
+      List<Pair<String, Object>> partitionPathFieldsAndValues = 
partitionPathFields.map(partitionFields -> {
+        Schema dataSchema = dataFileIterator.get().getRight();
+        List<Pair<String, Object>> filterFieldsAndValues = new 
ArrayList<>(partitionFields.length);
+        for (int i = 0; i < partitionFields.length; i++) {
+          String field = partitionFields[i];
+          if (dataSchema.getField(field) != null) {
+            filterFieldsAndValues.add(Pair.of(field, 
readerContext.convertValueToEngineType((Comparable) partitionValues[i])));
+          }
+        }
+        return filterFieldsAndValues;
+      }).orElseGet(Collections::emptyList);
       return 
readerContext.mergeBootstrapReaders(skeletonFileIterator.get().getLeft(), 
skeletonFileIterator.get().getRight(),
-          dataFileIterator.get().getLeft(), dataFileIterator.get().getRight());
+          dataFileIterator.get().getLeft(), dataFileIterator.get().getRight(), 
partitionPathFieldsAndValues);
     }
   }
 
@@ -253,7 +274,11 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
       return 
Option.of(Pair.of(readerContext.getFileRecordIterator(fileStoragePathInfo, 0, 
file.getFileLen(),
           readerContext.getSchemaHandler().createSchemaFromFields(allFields), 
requiredSchema, storage), requiredSchema));
     } else {
-      return 
Option.of(Pair.of(readerContext.getFileRecordIterator(file.getStoragePath(), 0, 
file.getFileLen(),
+      // If the base file length passed in is invalid, i.e., -1,
+      // the file group reader fetches the length from the file system
+      long fileLength = file.getFileLen() >= 0
+          ? file.getFileLen() : 
storage.getPathInfo(file.getStoragePath()).getLength();
+      return 
Option.of(Pair.of(readerContext.getFileRecordIterator(file.getStoragePath(), 0, 
fileLength,
           readerContext.getSchemaHandler().createSchemaFromFields(allFields), 
requiredSchema, storage), requiredSchema));
     }
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroReaderContext.java
 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroReaderContext.java
index bcf9fbf5d61..7d48efe594b 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroReaderContext.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroReaderContext.java
@@ -79,7 +79,7 @@ class TestHoodieAvroReaderContext {
     ClosableIterator<IndexedRecord> baseIterator = 
ClosableIterator.wrap(Arrays.asList(baseRecord1, baseRecord2, 
baseRecord3).iterator());
 
     List<IndexedRecord> actual = new ArrayList<>();
-    try (ClosableIterator<IndexedRecord> iter = 
avroReaderContext.mergeBootstrapReaders(skeletonIterator, 
LIMITED_SKELETON_SCHEMA, baseIterator, LIMITED_BASE_SCHEMA)) {
+    try (ClosableIterator<IndexedRecord> iter = 
avroReaderContext.mergeBootstrapReaders(skeletonIterator, 
LIMITED_SKELETON_SCHEMA, baseIterator, LIMITED_BASE_SCHEMA, 
Collections.emptyList())) {
       iter.forEachRemaining(actual::add);
     }
     assertEquals(Arrays.asList(expectedRecord1, expectedRecord2, 
expectedRecord3), actual);
@@ -105,7 +105,7 @@ class TestHoodieAvroReaderContext {
     ClosableIterator<IndexedRecord> baseIterator = 
ClosableIterator.wrap(Arrays.asList(baseRecord1, baseRecord2, 
baseRecord3).iterator());
 
     List<IndexedRecord> actual = new ArrayList<>();
-    try (ClosableIterator<IndexedRecord> iter = 
avroReaderContext.mergeBootstrapReaders(skeletonIterator, SKELETON_SCHEMA, 
baseIterator, BASE_SCHEMA)) {
+    try (ClosableIterator<IndexedRecord> iter = 
avroReaderContext.mergeBootstrapReaders(skeletonIterator, SKELETON_SCHEMA, 
baseIterator, BASE_SCHEMA, Collections.emptyList())) {
       iter.forEachRemaining(actual::add);
     }
     assertEquals(Arrays.asList(expectedRecord1, expectedRecord2, 
expectedRecord3), actual);
@@ -116,7 +116,7 @@ class TestHoodieAvroReaderContext {
     HoodieAvroReaderContext avroReaderContext = 
getReaderContextWithMetaFields();
     List<IndexedRecord> actual = new ArrayList<>();
     try (ClosableIterator<IndexedRecord> iter = 
avroReaderContext.mergeBootstrapReaders(ClosableIterator.wrap(Collections.emptyIterator()),
 SKELETON_SCHEMA,
-        ClosableIterator.wrap(Collections.emptyIterator()), BASE_SCHEMA)) {
+        ClosableIterator.wrap(Collections.emptyIterator()), BASE_SCHEMA, 
Collections.emptyList())) {
       iter.forEachRemaining(actual::add);
     }
     assertEquals(Collections.emptyList(), actual);
@@ -136,7 +136,7 @@ class TestHoodieAvroReaderContext {
 
     List<IndexedRecord> actual = new ArrayList<>();
     assertThrows(IllegalStateException.class, () -> {
-      try (ClosableIterator<IndexedRecord> iter = 
avroReaderContext.mergeBootstrapReaders(skeletonIterator, SKELETON_SCHEMA, 
baseIterator, BASE_SCHEMA)) {
+      try (ClosableIterator<IndexedRecord> iter = 
avroReaderContext.mergeBootstrapReaders(skeletonIterator, SKELETON_SCHEMA, 
baseIterator, BASE_SCHEMA, Collections.emptyList())) {
         iter.forEachRemaining(actual::add);
       }
     });
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestPartitionPathParser.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestPartitionPathParser.java
new file mode 100644
index 00000000000..e65f4499291
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestPartitionPathParser.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestPartitionPathParser {
+
+  private static Stream<Arguments> partitionPathCases() {
+    return Stream.of(
+        Arguments.of("2025/01/03/22", new String[]{"timestamp_field"}, new 
Object[]{new Timestamp(1735941600000L)}),
+        Arguments.of("2025-01-03-22", new String[]{"timestamp_field"}, new 
Object[]{new Timestamp(1735941600000L)}),
+        Arguments.of("timestamp_field=2025-01-03-22", new 
String[]{"timestamp_field"}, new Object[]{new Timestamp(1735941600000L)}),
+        Arguments.of("2025/01/03", new String[]{"date_field"}, new 
Object[]{Date.valueOf("2025-01-03")}),
+        Arguments.of("2025/01", new String[]{"date_field"}, new 
Object[]{Date.valueOf("2025-01-01")}),
+        Arguments.of("2025", new String[]{"date_field"}, new 
Object[]{Date.valueOf("2025-01-01")}),
+        Arguments.of("value1/2025/01/03", new 
String[]{"string_field","date_field"}, new Object[]{"value1", 
Date.valueOf("2025-01-03")}),
+        Arguments.of("2025/01/03/value1", new String[]{"date_field", 
"string_field"}, new Object[]{Date.valueOf("2025-01-03"), "value1"}),
+        Arguments.of("string_field=value1/year=2020/month=08/day=28/hour=06", 
new String[]{"string_field", "timestamp_field"}, new Object[]{"value1", new 
Timestamp(1598594400000L)}),
+        Arguments.of("year=2020/month=08/day=28/hour=06/string_field=value1", 
new String[]{"timestamp_field", "string_field"}, new Object[]{new 
Timestamp(1598594400000L), "value1"}),
+        Arguments.of("", null, new Object[]{})
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("partitionPathCases")
+  void testGetPartitionFieldVals(String partitionPath, String[] 
partitionFields, Object[] expectedValues) {
+    PartitionPathParser parser = new PartitionPathParser();
+    Schema schema = new 
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"TestRecord\",\"fields\":[{\"name\":\"string_field\",\"type\":[\"null\",
 \"string\"]},"
+        + "{\"name\":\"date_field\",\"type\": 
{\"type\":\"int\",\"logicalType\": 
\"date\"}},{\"name\":\"timestamp_field\",\"type\": 
{\"type\":\"long\",\"logicalType\": \"timestamp-millis\"}}]}");
+
+    Object[] result = 
parser.getPartitionFieldVals(Option.ofNullable(partitionFields), partitionPath, 
schema);
+    assertEquals(expectedValues.length, result.length);
+    for (int i = 0; i < expectedValues.length; i++) {
+      assertEquals(expectedValues[i], result[i]);
+    }
+  }
+
+  private static Stream<Arguments> fieldCases() {
+    return Stream.of(
+        Arguments.of("123", Schema.create(Schema.Type.LONG), 123L),
+        Arguments.of("123", Schema.create(Schema.Type.INT), 123),
+        Arguments.of("123.45", Schema.create(Schema.Type.DOUBLE), 123.45),
+        Arguments.of("123.45", Schema.create(Schema.Type.FLOAT), 123.45f),
+        Arguments.of("false", Schema.create(Schema.Type.BOOLEAN), false),
+        Arguments.of("__HIVE_DEFAULT_PARTITION__", 
Schema.create(Schema.Type.INT), null),
+        Arguments.of("default", Schema.create(Schema.Type.INT), null),
+        Arguments.of("2025-01-03", Schema.create(Schema.Type.STRING), 
"2025-01-03"),
+        Arguments.of("value1", Schema.create(Schema.Type.BYTES), 
"value1".getBytes(StandardCharsets.UTF_8)),
+        Arguments.of("value1", Schema.createFixed("fixed", "docs", null, 50), 
"value1".getBytes(StandardCharsets.UTF_8))
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("fieldCases")
+  void testValueParsing(String value, Schema fieldSchema, Object expected) {
+    if (expected instanceof byte[]) {
+      String expectedString = new String((byte[]) expected, 
StandardCharsets.UTF_8);
+      String actualString = new String((byte[]) 
PartitionPathParser.parseValue(value, fieldSchema));
+      assertEquals(expectedString, actualString);
+    } else {
+      assertEquals(expected, PartitionPathParser.parseValue(value, 
fieldSchema));
+    }
+  }
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 4e268d1bb0a..6135b21e950 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -29,7 +29,10 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.serialization.DefaultSerializer;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -37,7 +40,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
-import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
@@ -51,17 +54,24 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.StorageConfiguration;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.IOException;
 import java.io.Serializable;
+import java.net.URI;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -69,6 +79,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
 
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
@@ -129,7 +141,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
       commitToTable(initialRecords, INSERT.value(), writeConfigs);
       validateOutputFromFileGroupReader(
-          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 
0, recordMergeMode,
+          getStorageConf(), getBasePath(), true, 0, recordMergeMode,
           initialRecords, initialRecords);
 
       // Two commits; reading one file group containing a base file and a log 
file
@@ -138,7 +150,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       List<HoodieRecord> unmergedRecords = 
CollectionUtils.combine(initialRecords, updates);
       commitToTable(updates, UPSERT.value(), writeConfigs);
       validateOutputFromFileGroupReader(
-          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 
1, recordMergeMode,
+          getStorageConf(), getBasePath(), true, 1, recordMergeMode,
           allRecords, unmergedRecords);
 
       // Three commits; reading one file group containing a base file and two 
log files
@@ -146,7 +158,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       List<HoodieRecord> finalRecords = mergeRecordLists(updates2, allRecords);
       commitToTable(updates2, UPSERT.value(), writeConfigs);
       validateOutputFromFileGroupReader(
-          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true, 
2, recordMergeMode,
+          getStorageConf(), getBasePath(), true, 2, recordMergeMode,
           finalRecords, CollectionUtils.combine(unmergedRecords, updates2));
     }
   }
@@ -171,7 +183,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
       commitToTable(initialRecords, INSERT.value(), writeConfigs);
       validateOutputFromFileGroupReader(
-          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false, 
1, recordMergeMode,
+          getStorageConf(), getBasePath(), false, 1, recordMergeMode,
           initialRecords, initialRecords);
 
       // Two commits; reading one file group containing two log files
@@ -179,11 +191,27 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       List<HoodieRecord> allRecords = mergeRecordLists(updates, 
initialRecords);
       commitToTable(updates, UPSERT.value(), writeConfigs);
       validateOutputFromFileGroupReader(
-          getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false, 
2, recordMergeMode,
+          getStorageConf(), getBasePath(), false, 2, recordMergeMode,
           allRecords, CollectionUtils.combine(initialRecords, updates));
     }
   }
 
+  @Test
+  public void testReadFileGroupInBootstrapMergeOnReadTable() throws Exception {
+    Path zipOutput = Paths.get(new URI(getBasePath()));
+    extract(zipOutput);
+    ObjectMapper objectMapper = new ObjectMapper();
+    Path basePath = zipOutput.resolve("bootstrap_data");
+    List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords = new 
ArrayList<>();
+    
objectMapper.reader().forType(HoodieTestDataGenerator.RecordIdentifier.class).<HoodieTestDataGenerator.RecordIdentifier>readValues(basePath.resolve("merged_records.json").toFile())
+        .forEachRemaining(expectedRecords::add);
+    List<HoodieTestDataGenerator.RecordIdentifier> expectedUnMergedRecords = 
new ArrayList<>();
+    
objectMapper.reader().forType(HoodieTestDataGenerator.RecordIdentifier.class).<HoodieTestDataGenerator.RecordIdentifier>readValues(basePath.resolve("unmerged_records.json").toFile())
+        .forEachRemaining(expectedUnMergedRecords::add);
+    validateOutputFromFileGroupReaderWithExistingRecords(getStorageConf(), 
basePath.toString(), true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+        expectedRecords, expectedUnMergedRecords);
+  }
+
   @ParameterizedTest
   @EnumSource(value = ExternalSpillableMap.DiskMapType.class)
   public void testSpillableMapUsage(ExternalSpillableMap.DiskMapType 
diskMapType) throws Exception {
@@ -193,7 +221,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       String baseMapPath = Files.createTempDirectory(null).toString();
       HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(getStorageConf(), getBasePath());
       Schema avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
-      List<FileSlice> fileSlices = getFileSlicesToRead(getStorageConf(), 
getBasePath(), metaClient, dataGen.getPartitionPaths(), true, 0);
+      List<FileSlice> fileSlices = getFileSlicesToRead(getStorageConf(), 
getBasePath(), metaClient, true, 0);
       List<T> records = readRecordsFromFileGroup(getStorageConf(), 
getBasePath(), metaClient, fileSlices,
           avroSchema, RecordMergeMode.COMMIT_TIME_ORDERING, false);
       HoodieReaderContext<T> readerContext = 
getHoodieReaderContext(getBasePath(), avroSchema, getStorageConf(), metaClient);
@@ -257,7 +285,6 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
 
   private void validateOutputFromFileGroupReader(StorageConfiguration<?> 
storageConf,
                                                  String tablePath,
-                                                 String[] partitionPaths,
                                                  boolean containsBaseFile,
                                                  int expectedLogFileNum,
                                                  RecordMergeMode 
recordMergeMode,
@@ -265,11 +292,25 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
                                                  List<HoodieRecord> 
expectedHoodieUnmergedRecords) throws Exception {
     HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(storageConf, tablePath);
     Schema avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
-    // use reader context for conversion to engine specific objects
-    HoodieReaderContext<T> readerContext = getHoodieReaderContext(tablePath, 
avroSchema, getStorageConf(), metaClient);
     List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords = 
convertHoodieRecords(expectedHoodieRecords, avroSchema);
     List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords = 
convertHoodieRecords(expectedHoodieUnmergedRecords, avroSchema);
-    List<FileSlice> fileSlices = getFileSlicesToRead(storageConf, tablePath, 
metaClient, partitionPaths, containsBaseFile, expectedLogFileNum);
+    validateOutputFromFileGroupReaderWithExistingRecords(
+        storageConf, tablePath, containsBaseFile, expectedLogFileNum, 
recordMergeMode,
+        expectedRecords, expectedUnmergedRecords);
+  }
+
+  private void 
validateOutputFromFileGroupReaderWithExistingRecords(StorageConfiguration<?> 
storageConf,
+                                                                    String 
tablePath,
+                                                                    boolean 
containsBaseFile,
+                                                                    int 
expectedLogFileNum,
+                                                                    
RecordMergeMode recordMergeMode,
+                                                                    
List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords,
+                                                                    
List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords) throws 
Exception {
+    HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(storageConf, tablePath);
+    Schema avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
+    // use reader context for conversion to engine specific objects
+    HoodieReaderContext<T> readerContext = getHoodieReaderContext(tablePath, 
avroSchema, getStorageConf(), metaClient);
+    List<FileSlice> fileSlices = getFileSlicesToRead(storageConf, tablePath, 
metaClient, containsBaseFile, expectedLogFileNum);
     List<HoodieTestDataGenerator.RecordIdentifier> actualRecordList = 
convertEngineRecords(
         readRecordsFromFileGroup(storageConf, tablePath, metaClient, 
fileSlices, avroSchema, recordMergeMode, false),
         avroSchema, readerContext);
@@ -284,11 +325,10 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
   }
 
   private List<FileSlice> getFileSlicesToRead(StorageConfiguration<?> 
storageConf,
-                                       String tablePath,
-                                       HoodieTableMetaClient metaClient,
-                                       String[] partitionPaths,
-                                       boolean containsBaseFile,
-                                       int expectedLogFileNum) {
+                                              String tablePath,
+                                              HoodieTableMetaClient metaClient,
+                                              boolean containsBaseFile,
+                                              int expectedLogFileNum) {
     HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(storageConf);
     HoodieMetadataConfig metadataConfig = 
HoodieMetadataConfig.newBuilder().build();
     FileSystemViewManager viewManager = 
FileSystemViewManager.createViewManager(
@@ -298,9 +338,23 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
         HoodieCommonConfig.newBuilder().build(),
         mc -> HoodieTableMetadata.create(
             engineContext, mc.getStorage(), metadataConfig, tablePath));
-    SyncableFileSystemView fsView = viewManager.getFileSystemView(metaClient);
-    List<FileSlice> fileSlices = 
Arrays.stream(partitionPaths).flatMap(fsView::getAllFileSlices).collect(Collectors.toList());
+    HoodieTableFileSystemView fsView =
+        (HoodieTableFileSystemView) viewManager.getFileSystemView(metaClient);
+    List<String> relativePartitionPathList = FSUtils.getAllPartitionPaths(
+        engineContext, metaClient.getStorage(),
+        metadataConfig, metaClient.getBasePath().toString());
+    List<FileSlice> fileSlices =
+        relativePartitionPathList.stream().flatMap(fsView::getAllFileSlices)
+            .collect(Collectors.toList());
     fileSlices.forEach(fileSlice -> {
+      if (fileSlice.hasBootstrapBase()) {
+        // bootstrap file points to an absolute path
+        // Since the dataset is copied to a new tempDir for testing, we need 
to manipulate this path
+        HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+        String bootstrapPath = baseFile.getBootstrapBaseFile().get().getPath();
+        String newBootstrapPath = tablePath + "/" + 
bootstrapPath.substring(bootstrapPath.indexOf("bootstrap_table"));
+        baseFile.setBootstrapBaseFile(new BaseFile(newBootstrapPath));
+      }
       List<String> logFilePathList = getLogFileListFromFileSlice(fileSlice);
       assertEquals(expectedLogFileNum, logFilePathList.size());
       assertEquals(containsBaseFile, fileSlice.getBaseFile().isPresent());
@@ -413,4 +467,26 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
             readerContext.getValue(record, schema, 
RIDER_FIELD_NAME).toString()))
         .collect(Collectors.toList());
   }
+
+  private void extract(Path target) throws IOException {
+    try (ZipInputStream zip = new 
ZipInputStream(this.getClass().getClassLoader().getResourceAsStream("file-group-reader/bootstrap_data.zip")))
 {
+      ZipEntry entry;
+
+      while ((entry = zip.getNextEntry()) != null) {
+        File file = target.resolve(entry.getName()).toFile();
+        if (entry.isDirectory()) {
+          file.mkdirs();
+          continue;
+        }
+        byte[] buffer = new byte[10000];
+        file.getParentFile().mkdirs();
+        try (BufferedOutputStream out = new 
BufferedOutputStream(Files.newOutputStream(file.toPath()))) {
+          int count;
+          while ((count = zip.read(buffer)) != -1) {
+            out.write(buffer, 0, count);
+          }
+        }
+      }
+    }
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 55e52855bca..3549a0ba3e8 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -39,6 +39,8 @@ import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.avro.Conversions;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
@@ -1219,7 +1221,11 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
     private final String partitionPath;
     private final String riderValue;
 
-    public RecordIdentifier(String recordKey, String partitionPath, String 
orderingVal, String riderValue) {
+    @JsonCreator
+    public RecordIdentifier(@JsonProperty("recordKey") String recordKey,
+                            @JsonProperty("partitionPath") String 
partitionPath,
+                            @JsonProperty("orderingVal") String orderingVal,
+                            @JsonProperty("riderValue") String riderValue) {
       this.recordKey = recordKey;
       this.orderingVal = orderingVal;
       this.partitionPath = partitionPath;
@@ -1254,5 +1260,21 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
     public int hashCode() {
       return Objects.hash(recordKey, orderingVal, partitionPath, riderValue);
     }
+
+    public String getRecordKey() {
+      return recordKey;
+    }
+
+    public String getOrderingVal() {
+      return orderingVal;
+    }
+
+    public String getPartitionPath() {
+      return partitionPath;
+    }
+
+    public String getRiderValue() {
+      return riderValue;
+    }
   }
 }
diff --git 
a/hudi-common/src/test/resources/file-group-reader/bootstrap_data.zip 
b/hudi-common/src/test/resources/file-group-reader/bootstrap_data.zip
new file mode 100644
index 00000000000..2e6758c168b
Binary files /dev/null and 
b/hudi-common/src/test/resources/file-group-reader/bootstrap_data.zip differ
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 87138797958..93878c35d25 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.format;
 
+import org.apache.hudi.client.model.BootstrapRowData;
 import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.HoodieFlinkRecord;
@@ -37,6 +38,7 @@ import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.source.ExpressionPredicates.Predicate;
@@ -189,7 +191,12 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
       ClosableIterator<RowData> skeletonFileIterator,
       Schema skeletonRequiredSchema,
       ClosableIterator<RowData> dataFileIterator,
-      Schema dataRequiredSchema) {
+      Schema dataRequiredSchema,
+      List<Pair<String, Object>> partitionFieldAndValues) {
+    Map<Integer, Object> partitionOrdinalToValues = 
partitionFieldAndValues.stream()
+        .collect(Collectors.toMap(
+            pair -> dataRequiredSchema.getField(pair.getKey()).pos(),
+            Pair::getValue));
     return new ClosableIterator<RowData>() {
       final JoinedRowData joinedRow = new JoinedRowData();
       @Override
@@ -204,12 +211,19 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
       @Override
       public RowData next() {
         RowData skeletonRow = skeletonFileIterator.next();
-        RowData dataRow = dataFileIterator.next();
+        RowData dataRow = appendPartitionFields(dataFileIterator.next());
         joinedRow.setRowKind(dataRow.getRowKind());
         joinedRow.replace(skeletonRow, dataRow);
         return joinedRow;
       }
 
+      private RowData appendPartitionFields(RowData dataRow) {
+        if (partitionFieldAndValues.isEmpty()) {
+          return dataRow;
+        }
+        return new BootstrapRowData(dataRow, partitionOrdinalToValues);
+      }
+
       @Override
       public void close() {
         skeletonFileIterator.close();
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index cfb04082541..a9830b2ca60 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
@@ -241,9 +242,13 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
   public ClosableIterator<ArrayWritable> 
mergeBootstrapReaders(ClosableIterator<ArrayWritable> skeletonFileIterator,
                                                                Schema 
skeletonRequiredSchema,
                                                                
ClosableIterator<ArrayWritable> dataFileIterator,
-                                                               Schema 
dataRequiredSchema) {
+                                                               Schema 
dataRequiredSchema,
+                                                               
List<Pair<String, Object>> partitionFieldsAndValues) {
     int skeletonLen = skeletonRequiredSchema.getFields().size();
     int dataLen = dataRequiredSchema.getFields().size();
+    int[] partitionFieldPositions = partitionFieldsAndValues.stream()
+        .map(pair -> 
dataRequiredSchema.getField(pair.getKey()).pos()).mapToInt(Integer::intValue).toArray();
+    Writable[] convertedPartitionValues = 
partitionFieldsAndValues.stream().map(Pair::getValue).toArray(Writable[]::new);
     return new ClosableIterator<ArrayWritable>() {
 
       private final ArrayWritable returnWritable = new 
ArrayWritable(Writable.class);
@@ -260,6 +265,11 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
       public ArrayWritable next() {
         Writable[] skeletonWritable = skeletonFileIterator.next().get();
         Writable[] dataWritable = dataFileIterator.next().get();
+        for (int i = 0; i < partitionFieldPositions.length; i++) {
+          if (dataWritable[partitionFieldPositions[i]] == null) {
+            dataWritable[partitionFieldPositions[i]] = 
convertedPartitionValues[i];
+          }
+        }
         Writable[] mergedWritable = new Writable[skeletonLen + dataLen];
         System.arraycopy(skeletonWritable, 0, mergedWritable, 0, skeletonLen);
         System.arraycopy(dataWritable, 0, mergedWritable, skeletonLen, 
dataLen);
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
index 23d6c0cb511..fd637490db8 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
@@ -59,7 +59,7 @@ public class ObjectInspectorCache {
     // eg: current table is col1, col2, col3; 
jobConf.get(serdeConstants.LIST_COLUMNS): col1, col2, col3 
,BLOCK__OFFSET__INSIDE__FILE ...
     Set<String> writerSchemaColNames = tableSchema.getFields().stream().map(f 
-> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toSet());
     List<String> columnNameList = 
Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
-    List<TypeInfo> columnTypeList =  
TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
+    List<TypeInfo> columnTypeList = 
TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
 
     int columnNameListLen = columnNameList.size() - 1;
     for (int i = columnNameListLen; i >= 0; i--) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
index 3d4e87ef289..d0d045da593 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
@@ -79,13 +79,13 @@ class TestHoodieInternalRowUtils extends FunSuite with 
Matchers with BeforeAndAf
     val data = sparkSession.sparkContext.parallelize(rows)
     val oldRow = sparkSession.createDataFrame(data, 
mergedSchema).queryExecution.toRdd.first()
 
-    val rowWriter1 = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema, 
schema1, JCollections.emptyMap())
+    val rowWriter1 = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema, 
schema1, JCollections.emptyMap(), JCollections.emptyMap())
     val newRow1 = rowWriter1(oldRow)
 
     val serDe1 = sparkAdapter.createSparkRowSerDe(schema1)
     assertEquals(serDe1.deserializeRow(newRow1), Row("Andrew", 18, 
Row("Mission st", "SF")));
 
-    val rowWriter2 = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema, 
schema2, JCollections.emptyMap())
+    val rowWriter2 = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema, 
schema2, JCollections.emptyMap(), JCollections.emptyMap())
     val newRow2 = rowWriter2(oldRow)
 
     val serDe2 = sparkAdapter.createSparkRowSerDe(schema2)
@@ -95,13 +95,35 @@ class TestHoodieInternalRowUtils extends FunSuite with 
Matchers with BeforeAndAf
   test("Test simple rewriting (with nullable value)") {
     val data = sparkSession.sparkContext.parallelize(Seq(Row("Rob", 18, 
null.asInstanceOf[StructType])))
     val oldRow = sparkSession.createDataFrame(data, 
schema1).queryExecution.toRdd.first()
-    val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(schema1, 
mergedSchema, JCollections.emptyMap())
+    val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(schema1, 
mergedSchema, JCollections.emptyMap(), JCollections.emptyMap())
     val newRow = rowWriter(oldRow)
 
     val serDe = sparkAdapter.createSparkRowSerDe(mergedSchema)
     assertEquals(serDe.deserializeRow(newRow), Row("Rob", 18, 
null.asInstanceOf[StructType], null.asInstanceOf[StringType], 
null.asInstanceOf[IntegerType]))
   }
 
+  test("Test rewriting with field value injections") {
+    val rowWithNull = Seq(
+      Row("Andrew", null, Row("Mission st", "SF"), "John", 19)
+    )
+    val oldRow = 
sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(rowWithNull),
 mergedSchema).queryExecution.toRdd.first()
+
+    val updatedValuesMap: java.util.Map[Integer, Object] = 
JCollections.singletonMap(1, 18).asInstanceOf[java.util.Map[Integer, Object]]
+    val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema, 
schema1, JCollections.emptyMap(), updatedValuesMap)
+    val newRow1 = rowWriter(oldRow)
+
+    val serDe = sparkAdapter.createSparkRowSerDe(schema1)
+    assertEquals(serDe.deserializeRow(newRow1), Row("Andrew", 18, Row("Mission 
st", "SF")));
+
+    // non-nul value should not be rewritten
+    val rowWithoutNull = Seq(
+      Row("Andrew", 25, Row("Mission st", "SF"), "John", 19)
+    )
+    val oldRow2 = 
sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(rowWithoutNull),
 mergedSchema).queryExecution.toRdd.first()
+    val newRow2 = rowWriter(oldRow2)
+    assertEquals(serDe.deserializeRow(newRow2), Row("Andrew", 25, Row("Mission 
st", "SF")));
+  }
+
   /**
    * test record data type changes.
    * int => long/float/double/string
@@ -192,7 +214,7 @@ class TestHoodieInternalRowUtils extends FunSuite with 
Matchers with BeforeAndAf
     val newRowExpected = 
AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, 
newStructTypeSchema)
       .apply(newRecord).get
 
-    val rowWriter = 
HoodieInternalRowUtils.genUnsafeRowWriter(structTypeSchema, 
newStructTypeSchema, new HashMap[String, String])
+    val rowWriter = 
HoodieInternalRowUtils.genUnsafeRowWriter(structTypeSchema, 
newStructTypeSchema, JCollections.emptyMap(), JCollections.emptyMap())
     val newRow = rowWriter(row)
 
     internalRowCompare(newRowExpected, newRow, newStructTypeSchema)
@@ -247,7 +269,7 @@ class TestHoodieInternalRowUtils extends FunSuite with 
Matchers with BeforeAndAf
     val row = AvroConversionUtils.createAvroToInternalRowConverter(schema, 
structTypeSchema).apply(avroRecord).get
     val newRowExpected = 
AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, 
newStructTypeSchema).apply(newAvroRecord).get
 
-    val rowWriter = 
HoodieInternalRowUtils.genUnsafeRowWriter(structTypeSchema, 
newStructTypeSchema, new HashMap[String, String])
+    val rowWriter = 
HoodieInternalRowUtils.genUnsafeRowWriter(structTypeSchema, 
newStructTypeSchema, JCollections.emptyMap(), JCollections.emptyMap())
     val newRow = rowWriter(row)
 
     internalRowCompare(newRowExpected, newRow, newStructTypeSchema)
diff --git a/pom.xml b/pom.xml
index 20a4ca00014..a56b62d3890 100644
--- a/pom.xml
+++ b/pom.xml
@@ -713,6 +713,7 @@
               <exclude>**/test/resources/*.commit</exclude>
               <exclude>**/test/resources/**/*.txt</exclude>
               <exclude>**/test/resources/**/*.avsc</exclude>
+              <exclude>**/test/resources/**/.zip</exclude>
               <exclude>**/target/**</exclude>
               <exclude>**/generated-sources/**</exclude>
               <exclude>.github/**</exclude>

Reply via email to