This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f0fcbf6eaf3 [HUDI-9318] Refactor the log records presentation in 
FileGroupRecordBuffer (#13225)
f0fcbf6eaf3 is described below

commit f0fcbf6eaf39dfe79e2b27ff7d626b0a8c06bce0
Author: Shuo Cheng <[email protected]>
AuthorDate: Mon Apr 28 13:50:29 2025 +0800

    [HUDI-9318] Refactor the log records presentation in FileGroupRecordBuffer 
(#13225)
---
 .../hudi/client/model/AbstractHoodieRowData.java   |   2 +-
 .../hudi/client/model/HoodieFlinkRecord.java       |  27 +--
 .../model/HoodieRowDataWithUpdatedMetaField.java   |  57 +++++
 .../apache/hudi/util/RowDataAvroQueryContexts.java |  11 +
 .../hudi/common/model/HoodieSparkRecord.java       |  20 +-
 .../hudi/BaseSparkInternalRowReaderContext.java    |  23 +-
 .../apache/spark/sql/HoodieInternalRowUtils.scala  |  14 ++
 .../apache/hudi/avro/HoodieAvroReaderContext.java  |  20 +-
 .../hudi/common/engine/HoodieReaderContext.java    |  92 ++------
 .../hudi/common/model/HoodieAvroIndexedRecord.java |   2 +-
 .../apache/hudi/common/model/HoodieAvroRecord.java |   2 +-
 .../hudi/common/model/HoodieEmptyRecord.java       |   2 +-
 .../org/apache/hudi/common/model/HoodieRecord.java |  25 ++-
 .../table/log/HoodieMergedLogRecordReader.java     |   8 +-
 .../hudi/common/table/read/BufferedRecord.java     | 110 +++++++++
 .../common/table/read/FileGroupRecordBuffer.java   | 247 +++++++++------------
 .../table/read/HoodieFileGroupRecordBuffer.java    |  15 +-
 .../table/read/KeyBasedFileGroupRecordBuffer.java  |  61 ++---
 .../read/PositionBasedFileGroupRecordBuffer.java   |  75 +++----
 .../table/read/UnmergedFileGroupRecordBuffer.java  |  22 +-
 .../table/read/TestFileGroupRecordBuffer.java      |  35 ++-
 .../table/read/TestHoodieFileGroupReaderBase.java  |  39 ++--
 .../table/format/FlinkRowDataReaderContext.java    |  55 +++--
 .../hudi/hadoop/HiveHoodieReaderContext.java       |  20 +-
 .../org/apache/hudi/hadoop/HoodieHiveRecord.java   |   2 +-
 .../TestPositionBasedFileGroupRecordBuffer.java    |   8 +-
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |  15 +-
 27 files changed, 526 insertions(+), 483 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/AbstractHoodieRowData.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/AbstractHoodieRowData.java
index 7f3217f339b..5c92ebb1500 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/AbstractHoodieRowData.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/AbstractHoodieRowData.java
@@ -161,7 +161,7 @@ public abstract class AbstractHoodieRowData implements 
RowData {
     return row.getMap(rebaseOrdinal(ordinal));
   }
 
-  private String getMetaColumnVal(int ordinal) {
+  protected String getMetaColumnVal(int ordinal) {
     return this.metaColumns[ordinal];
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
index 376606f0e21..c3d9ef4364d 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.util.RowDataAvroQueryContexts;
@@ -53,7 +52,6 @@ import static 
org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
  * Flink Engine-specific Implementations of `HoodieRecord`, which is expected 
to hold {@code RowData} as payload.
  */
 public class HoodieFlinkRecord extends HoodieRecord<RowData> {
-  private Comparable<?> orderingValue;
 
   public HoodieFlinkRecord(RowData rowData) {
     super(null, rowData);
@@ -96,16 +94,13 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
   }
 
   @Override
-  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) 
{
-    if (this.orderingValue == null) {
-      String orderingField = ConfigUtils.getOrderingField(props);
-      if (isNullOrEmpty(orderingField)) {
-        this.orderingValue = DEFAULT_ORDERING_VALUE;
-      } else {
-        this.orderingValue = (Comparable<?>) 
getColumnValueAsJava(recordSchema, orderingField, props, false);
-      }
+  protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props) {
+    String orderingField = ConfigUtils.getOrderingField(props);
+    if (isNullOrEmpty(orderingField)) {
+      return DEFAULT_ORDERING_VALUE;
+    } else {
+      return (Comparable<?>) getColumnValueAsJava(recordSchema, orderingField, 
props, false);
     }
-    return this.orderingValue;
   }
 
   @Override
@@ -170,11 +165,11 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
 
   @Override
   public HoodieRecord updateMetaField(Schema recordSchema, int ordinal, String 
value) {
-    
ValidationUtils.checkArgument(recordSchema.getField(RECORD_KEY_METADATA_FIELD) 
!= null,
-        "The record is expected to contain metadata fields.");
-    GenericRowData rowData = (GenericRowData) getData();
-    rowData.setField(ordinal, StringData.fromString(value));
-    return this;
+    String[] metaVals = new String[HoodieRecord.HOODIE_META_COLUMNS.size()];
+    metaVals[ordinal] = value;
+    boolean withOperation = recordSchema.getField(OPERATION_METADATA_FIELD) != 
null;
+    RowData rowData = new HoodieRowDataWithUpdatedMetaField(metaVals, ordinal, 
getData(), withOperation);
+    return new HoodieFlinkRecord(getKey(), getOperation(), orderingValue, 
rowData);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowDataWithUpdatedMetaField.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowDataWithUpdatedMetaField.java
new file mode 100644
index 00000000000..9d66201dc82
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieRowDataWithUpdatedMetaField.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+
+/**
+ * RowData implementation for Hoodie Row. It wraps an {@link RowData} and 
keeps meta columns locally,
+ * but the meta columns array only contains one updated meta field, e.g., 
updated `FILENAME_METADATA_FIELD`
+ * for base file writing during compaction.
+ */
+public class HoodieRowDataWithUpdatedMetaField extends 
HoodieRowDataWithMetaFields {
+  private final int updatedMetaOrdinal;
+
+  public HoodieRowDataWithUpdatedMetaField(
+      String[] metaVals,
+      int updatedMetaOrdinal,
+      RowData row,
+      boolean withOperation) {
+    super(metaVals[0], metaVals[1], metaVals[2], metaVals[3], metaVals[4], 
row, withOperation);
+    this.updatedMetaOrdinal = updatedMetaOrdinal;
+  }
+
+  @Override
+  public boolean isNullAt(int ordinal) {
+    if (updatedMetaOrdinal == ordinal) {
+      return null == getMetaColumnVal(ordinal);
+    } else {
+      return row.isNullAt(rebaseOrdinal(ordinal));
+    }
+  }
+
+  @Override
+  public StringData getString(int ordinal) {
+    if (updatedMetaOrdinal == ordinal) {
+      return StringData.fromString(getMetaColumnVal(ordinal));
+    }
+    return row.getString(rebaseOrdinal(ordinal));
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
index bf7875cb892..055d41f1d61 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
@@ -20,6 +20,7 @@ package org.apache.hudi.util;
 
 import org.apache.avro.Schema;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -39,6 +40,9 @@ import java.util.function.Function;
 public class RowDataAvroQueryContexts {
   private static final Map<Schema, RowDataQueryContext> QUERY_CONTEXT_MAP = 
new ConcurrentHashMap<>();
 
+  // BinaryRowWriter in RowDataSerializer are reused, and it's not thread-safe.
+  private static final ThreadLocal<Map<Schema, RowDataSerializer>> 
ROWDATA_SERIALIZER_CACHE = ThreadLocal.withInitial(HashMap::new);
+
   public static RowDataQueryContext fromAvroSchema(Schema avroSchema) {
     return fromAvroSchema(avroSchema, true);
   }
@@ -62,6 +66,13 @@ public class RowDataAvroQueryContexts {
     });
   }
 
+  public static RowDataSerializer getRowDataSerializer(Schema avroSchema) {
+    return ROWDATA_SERIALIZER_CACHE.get().computeIfAbsent(avroSchema, schema 
-> {
+      RowType rowType = (RowType) 
fromAvroSchema(schema).getRowType().getLogicalType();
+      return new RowDataSerializer(rowType);
+    });
+  }
+
   public static class RowDataQueryContext {
     private final DataType rowType;
     private final Map<String, FieldQueryContext> contextMap;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index e6b336f9870..d94888fd83b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -324,20 +324,18 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
   }
 
   @Override
-  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) 
{
+  protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props) {
     StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
     String orderingField = ConfigUtils.getOrderingField(props);
-    if (isNullOrEmpty(orderingField)) {
-      return DEFAULT_ORDERING_VALUE;
-    }
-    scala.Option<NestedFieldPath> cachedNestedFieldPath =
-        HoodieInternalRowUtils.getCachedPosList(structType, orderingField);
-    if (cachedNestedFieldPath.isDefined()) {
-      NestedFieldPath nestedFieldPath = cachedNestedFieldPath.get();
-      return (Comparable<?>) 
HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath);
-    } else {
-      return DEFAULT_ORDERING_VALUE;
+    if (!isNullOrEmpty(orderingField)) {
+      scala.Option<NestedFieldPath> cachedNestedFieldPath =
+          HoodieInternalRowUtils.getCachedPosList(structType, orderingField);
+      if (cachedNestedFieldPath.isDefined()) {
+        NestedFieldPath nestedFieldPath = cachedNestedFieldPath.get();
+        return (Comparable<?>) 
HoodieUnsafeRowUtils.getNestedInternalRowValue(data, nestedFieldPath);
+      }
     }
+    return DEFAULT_ORDERING_VALUE;
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 8df5bdf57b7..92478ee19fc 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -36,6 +37,7 @@ import org.apache.avro.Schema;
 import org.apache.spark.sql.HoodieInternalRowUtils;
 import org.apache.spark.sql.HoodieUnsafeRowUtils;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
@@ -93,17 +95,15 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
   }
 
   @Override
-  public HoodieRecord<InternalRow> constructHoodieRecord(Option<InternalRow> 
rowOption,
-                                                         Map<String, Object> 
metadataMap) {
-    if (!rowOption.isPresent()) {
+  public HoodieRecord<InternalRow> 
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
+    if (bufferedRecord.isDelete()) {
       return new HoodieEmptyRecord<>(
-          new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY),
-              (String) metadataMap.get(INTERNAL_META_PARTITION_PATH)),
+          new HoodieKey(bufferedRecord.getRecordKey(), null),
           HoodieRecord.HoodieRecordType.SPARK);
     }
 
-    Schema schema = getSchemaFromMetadata(metadataMap);
-    InternalRow row = rowOption.get();
+    Schema schema = getSchemaFromBufferRecord(bufferedRecord);
+    InternalRow row = bufferedRecord.getRecord();
     return new HoodieSparkRecord(row, 
HoodieInternalRowUtils.getCachedSchema(schema));
   }
 
@@ -112,6 +112,15 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
     return internalRow.copy();
   }
 
+  @Override
+  public InternalRow toBinaryRow(Schema schema, InternalRow internalRow) {
+    if (internalRow instanceof UnsafeRow) {
+      return internalRow;
+    }
+    final UnsafeProjection unsafeProjection = 
HoodieInternalRowUtils.getCachedUnsafeProjection(schema);
+    return unsafeProjection.apply(internalRow);
+  }
+
   private Object getFieldValueFromInternalRow(InternalRow row, Schema 
recordSchema, String fieldName) {
     StructType structType = getCachedSchema(recordSchema);
     scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath =
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index 2a0c2568cdb..519519fea37 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -60,6 +60,12 @@ object HoodieInternalRowUtils {
         new mutable.HashMap[(StructType, StructType), UnsafeProjection]
     })
 
+  private val identicalUnsafeProjectionThreadLocal: 
ThreadLocal[mutable.HashMap[Schema, UnsafeProjection]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[Schema, 
UnsafeProjection]] {
+      override def get(): mutable.HashMap[Schema, UnsafeProjection] =
+        new mutable.HashMap[Schema, UnsafeProjection]
+    })
+
   private val schemaMap = new ConcurrentHashMap[Schema, StructType]
   private val orderPosListMap = new ConcurrentHashMap[(StructType, String), 
Option[NestedFieldPath]]
 
@@ -75,6 +81,14 @@ object HoodieInternalRowUtils {
       .getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
   }
 
+  /**
+   * Provides cached instance of [[UnsafeProjection]] to project Java object 
based [[InternalRow]] to [[UnsafeRow]].
+   */
+  def getCachedUnsafeProjection(schema: Schema): UnsafeProjection = {
+    identicalUnsafeProjectionThreadLocal.get()
+      .getOrElseUpdate(schema, 
UnsafeProjection.create(getCachedSchema(schema)))
+  }
+
   /**
    * Provides cached instance of [[UnsafeRowWriter]] transforming provided 
[[InternalRow]]s from
    * one [[StructType]] and into another [[StructType]]
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index 13831b62c96..613a21159de 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.OverwriteWithLatestMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
@@ -160,17 +161,15 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
   }
 
   @Override
-  public HoodieRecord constructHoodieRecord(
-      Option<IndexedRecord> recordOpt,
-      Map<String, Object> metadataMap) {
-    if (!recordOpt.isPresent()) {
+  public HoodieRecord<IndexedRecord> 
constructHoodieRecord(BufferedRecord<IndexedRecord> bufferedRecord) {
+    if (bufferedRecord.isDelete()) {
       return SpillableMapUtils.generateEmptyPayload(
-          (String) metadataMap.get(INTERNAL_META_RECORD_KEY),
-          (String) metadataMap.get(INTERNAL_META_PARTITION_PATH),
-          (Comparable<?>) metadataMap.get(INTERNAL_META_ORDERING_FIELD),
+          bufferedRecord.getRecordKey(),
+          null,
+          bufferedRecord.getOrderingValue(),
           payloadClass);
     }
-    return new HoodieAvroIndexedRecord(recordOpt.get());
+    return new HoodieAvroIndexedRecord(bufferedRecord.getRecord());
   }
 
   @Override
@@ -178,6 +177,11 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
     return record;
   }
 
+  @Override
+  public IndexedRecord toBinaryRow(Schema avroSchema, IndexedRecord record) {
+    return record;
+  }
+
   @Override
   public ClosableIterator<IndexedRecord> 
mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator,
                                                                Schema 
skeletonRequiredSchema,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 88d6ca971d3..62249b384cd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.LocalAvroSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -39,7 +40,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.function.UnaryOperator;
 
@@ -151,15 +151,6 @@ public abstract class HoodieReaderContext<T> {
     return storageConfiguration;
   }
 
-  // These internal key names are only used in memory for record metadata and 
merging,
-  // and should not be persisted to storage.
-  public static final String INTERNAL_META_RECORD_KEY = "_0";
-  public static final String INTERNAL_META_PARTITION_PATH = "_1";
-  public static final String INTERNAL_META_ORDERING_FIELD = "_2";
-  public static final String INTERNAL_META_OPERATION = "_3";
-  public static final String INTERNAL_META_INSTANT_TIME = "_4";
-  public static final String INTERNAL_META_SCHEMA_ID = "_5";
-
   /**
    * Gets the record iterator based on the type of engine-specific record 
representation from the
    * file.
@@ -251,39 +242,30 @@ public abstract class HoodieReaderContext<T> {
   /**
    * Gets the ordering value in particular type.
    *
-   * @param recordOption An option of record.
-   * @param metadataMap  A map containing the record metadata.
-   * @param schema       The Avro schema of the record.
+   * @param record An option of record.
+   * @param schema The Avro schema of the record.
    * @param orderingFieldName name of the ordering field
    * @return The ordering value.
    */
-  public Comparable getOrderingValue(Option<T> recordOption,
-                                     Map<String, Object> metadataMap,
+  public Comparable getOrderingValue(T record,
                                      Schema schema,
                                      Option<String> orderingFieldName) {
-    if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
-      return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
-    }
-
-    if (!recordOption.isPresent() || orderingFieldName.isEmpty()) {
+    if (orderingFieldName.isEmpty()) {
       return DEFAULT_ORDERING_VALUE;
     }
 
-    Object value = getValue(recordOption.get(), schema, 
orderingFieldName.get());
+    Object value = getValue(record, schema, orderingFieldName.get());
     Comparable finalOrderingVal = value != null ? 
convertValueToEngineType((Comparable) value) : DEFAULT_ORDERING_VALUE;
-    metadataMap.put(INTERNAL_META_ORDERING_FIELD, finalOrderingVal);
     return finalOrderingVal;
   }
 
   /**
-   * Constructs a new {@link HoodieRecord} based on the record of 
engine-specific type and metadata for merging.
+   * Constructs a new {@link HoodieRecord} based on the given buffered record 
{@link BufferedRecord}.
    *
-   * @param recordOption An option of the record in engine-specific type if 
exists.
-   * @param metadataMap  The record metadata.
+   * @param bufferedRecord  The {@link BufferedRecord} object with 
engine-specific row
    * @return A new instance of {@link HoodieRecord}.
    */
-  public abstract HoodieRecord<T> constructHoodieRecord(Option<T> recordOption,
-                                                        Map<String, Object> 
metadataMap);
+  public abstract HoodieRecord<T> constructHoodieRecord(BufferedRecord<T> 
bufferedRecord);
 
   /**
    * Seals the engine-specific record to make sure the data referenced in 
memory do not change.
@@ -294,58 +276,24 @@ public abstract class HoodieReaderContext<T> {
   public abstract T seal(T record);
 
   /**
-   * Generates metadata map based on the information.
+   * Converts engine specific row into binary format.
    *
-   * @param recordKey     Record key in String.
-   * @param partitionPath Partition path in String.
-   * @param orderingVal   Ordering value in String.
-   * @return A mapping containing the metadata.
-   */
-  public Map<String, Object> generateMetadataForRecord(
-      String recordKey, String partitionPath, Comparable orderingVal) {
-    Map<String, Object> meta = new HashMap<>();
-    meta.put(INTERNAL_META_RECORD_KEY, recordKey);
-    meta.put(INTERNAL_META_PARTITION_PATH, partitionPath);
-    meta.put(INTERNAL_META_ORDERING_FIELD, orderingVal);
-    return meta;
-  }
-
-  /**
-   * Generates metadata of the record. Only fetches record key that is 
necessary for merging.
+   * @param avroSchema The avro schema of the row
+   * @param record     The engine row
    *
-   * @param record The record.
-   * @param schema The Avro schema of the record.
-   * @return A mapping containing the metadata.
+   * @return row in binary format
    */
-  public Map<String, Object> generateMetadataForRecord(T record, Schema 
schema) {
-    Map<String, Object> meta = new HashMap<>();
-    meta.put(INTERNAL_META_RECORD_KEY, getRecordKey(record, schema));
-    meta.put(INTERNAL_META_SCHEMA_ID, encodeAvroSchema(schema));
-    return meta;
-  }
+  public abstract T toBinaryRow(Schema avroSchema, T record);
 
   /**
-   * Gets the schema encoded in the metadata map
+   * Gets the schema encoded in the buffered record {@code BufferedRecord}.
    *
-   * @param infoMap The record metadata
-   * @return the avro schema if it is encoded in the metadata map, else null
-   */
-  public Schema getSchemaFromMetadata(Map<String, Object> infoMap) {
-    return decodeAvroSchema(infoMap.get(INTERNAL_META_SCHEMA_ID));
-  }
-
-  /**
-   * Updates the schema and reset the ordering value in existing metadata 
mapping of a record.
+   * @param record {@link BufferedRecord} object with engine-specific type
    *
-   * @param meta   Metadata in a mapping.
-   * @param schema New schema to set.
-   * @return The input metadata mapping.
+   * @return The avro schema if it is encoded in the metadata map, else null
    */
-  public Map<String, Object> 
updateSchemaAndResetOrderingValInMetadata(Map<String, Object> meta,
-                                                                       Schema 
schema) {
-    meta.remove(INTERNAL_META_ORDERING_FIELD);
-    meta.put(INTERNAL_META_SCHEMA_ID, encodeAvroSchema(schema));
-    return meta;
+  public Schema getSchemaFromBufferRecord(BufferedRecord<T> record) {
+    return decodeAvroSchema(record.getSchemaId());
   }
 
   /**
@@ -417,7 +365,7 @@ public abstract class HoodieReaderContext<T> {
   /**
    * Encodes the given avro schema for efficient serialization.
    */
-  private Integer encodeAvroSchema(Schema schema) {
+  public Integer encodeAvroSchema(Schema schema) {
     return this.localAvroSchemaCache.cacheSchema(schema);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index f4c53543103..8797b428d0b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -203,7 +203,7 @@ public class HoodieAvroIndexedRecord extends 
HoodieRecord<IndexedRecord> {
   }
 
   @Override
-  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) 
{
+  public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props) {
     String orderingField = ConfigUtils.getOrderingField(props);
     if (isNullOrEmpty(orderingField)) {
       return DEFAULT_ORDERING_VALUE;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index e4782ff5b2c..a3b9e5e521f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -96,7 +96,7 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> 
extends HoodieRecor
   }
 
   @Override
-  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) 
{
+  public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props) {
     return this.getData().getOrderingValue();
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index f482b21e56e..2d5dda75d1a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -58,7 +58,7 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
   }
 
   @Override
-  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) 
{
+  public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props) {
     return orderingVal;
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 97aff9db70a..601b62450b3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -158,6 +158,8 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
    */
   protected Option<Map<String, String>> metaData;
 
+  protected transient Comparable<?> orderingValue;
+
   public HoodieRecord(HoodieKey key, T data) {
     this(key, data, null, Option.empty());
   }
@@ -211,7 +213,28 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
     return operation;
   }
 
-  public abstract Comparable<?> getOrderingValue(Schema recordSchema, 
Properties props);
+  /**
+   * Get ordering value for the record from the cached variable, or extracting 
from the record if not cached.
+   *
+   * @param recordSchema Avro schema for the record
+   * @param props Properties containing the necessary configurations
+   * @return The ordering value for the record
+   */
+  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) 
{
+    if (orderingValue == null) {
+      orderingValue = doGetOrderingValue(recordSchema, props);
+    }
+    return orderingValue;
+  }
+
+  /**
+   * Extracting the ordering value from the record.
+   *
+   * @param recordSchema Avro schema for the record
+   * @param props Properties containing the necessary configurations
+   * @return The ordering value for the record
+   */
+  protected abstract Comparable<?> doGetOrderingValue(Schema recordSchema, 
Properties props);
 
   public T getData() {
     if (data == null) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index 91ef3681120..11d32dbc5f1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -22,11 +22,11 @@ package org.apache.hudi.common.table.log;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
+import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
@@ -51,7 +51,7 @@ import static 
org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
  * @param <T> type of engine-specific record representation.
  */
 public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
-    implements Iterable<Pair<Option<T>, Map<String, Object>>>, Closeable {
+    implements Iterable<BufferedRecord<T>>, Closeable {
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMergedLogRecordReader.class);
   // A timer for calculating elapsed time in millis
   public final HoodieTimer timer = HoodieTimer.create();
@@ -166,11 +166,11 @@ public class HoodieMergedLogRecordReader<T> extends 
BaseHoodieLogRecordReader<T>
   }
 
   @Override
-  public Iterator<Pair<Option<T>, Map<String, Object>>> iterator() {
+  public Iterator<BufferedRecord<T>> iterator() {
     return recordBuffer.getLogRecordIterator();
   }
 
-  public Map<Serializable, Pair<Option<T>, Map<String, Object>>> getRecords() {
+  public Map<Serializable, BufferedRecord<T>> getRecords() {
     return recordBuffer.getLogRecords();
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
new file mode 100644
index 00000000000..3972fd4257e
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+
+/**
+ * Buffered Record used by file group reader.
+ *
+ * @param <T> The type of the engine specific row.
+ */
+public class BufferedRecord<T> implements Serializable {
+  private final String recordKey;
+  private final Comparable orderingValue;
+  private T record;
+  private final Integer schemaId;
+  private final boolean isDelete;
+
+  private BufferedRecord(String recordKey, Comparable orderingValue, T record, 
Integer schemaId, boolean isDelete) {
+    this.recordKey = recordKey;
+    this.orderingValue = orderingValue;
+    this.record = record;
+    this.schemaId = schemaId;
+    this.isDelete = isDelete;
+  }
+
+  public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord<T> 
record, Schema schema, HoodieReaderContext<T> readerContext, Properties props) {
+    HoodieKey hoodieKey = record.getKey();
+    String recordKey = hoodieKey == null ? 
readerContext.getRecordKey(record.getData(), schema) : hoodieKey.getRecordKey();
+    Integer schemaId = readerContext.encodeAvroSchema(schema);
+    boolean isDelete;
+    try {
+      isDelete = record.isDelete(schema, props);
+    } catch (IOException e) {
+      throw new HoodieException("Failed to get isDelete from record.", e);
+    }
+    return new BufferedRecord<>(recordKey, record.getOrderingValue(schema, 
props), record.getData(), schemaId, isDelete);
+  }
+
+  public static <T> BufferedRecord<T> forRecordWithContext(T record, Schema 
schema, HoodieReaderContext<T> readerContext, Option<String> orderingFieldName, 
boolean isDelete) {
+    String recordKey = readerContext.getRecordKey(record, schema);
+    Integer schemaId = readerContext.encodeAvroSchema(schema);
+    Comparable orderingValue = readerContext.getOrderingValue(record, schema, 
orderingFieldName);
+    return new BufferedRecord<>(recordKey, orderingValue, record, schemaId, 
isDelete);
+  }
+
+  public static <T> BufferedRecord<T> forDeleteRecord(DeleteRecord 
deleteRecord, Comparable orderingValue) {
+    return new BufferedRecord<>(deleteRecord.getRecordKey(), orderingValue, 
null, null, true);
+  }
+
+  public String getRecordKey() {
+    return recordKey;
+  }
+
+  public Comparable getOrderingValue() {
+    return orderingValue;
+  }
+
+  public T getRecord() {
+    return record;
+  }
+
+  public Integer getSchemaId() {
+    return schemaId;
+  }
+
+  public boolean isDelete() {
+    return isDelete;
+  }
+
+  public boolean isCommitTimeOrderingDelete() {
+    return isDelete && getOrderingValue().equals(DEFAULT_ORDERING_VALUE);
+  }
+
+  public BufferedRecord<T> toBinary(HoodieReaderContext<T> readerContext) {
+    if (record != null) {
+      record = 
readerContext.seal(readerContext.toBinaryRow(readerContext.getSchemaFromBufferRecord(this),
 record));
+    }
+    return this;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index 3414fc254da..d19ed4a56e3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -60,7 +60,6 @@ import org.apache.avro.generic.IndexedRecord;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.Locale;
 import java.util.Map;
@@ -71,8 +70,6 @@ import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
 import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
 import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
-import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
-import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
 import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
 import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
@@ -89,12 +86,12 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
   protected final Option<HoodieRecordMerger> recordMerger;
   protected final Option<String> payloadClass;
   protected final TypedProperties props;
-  protected final ExternalSpillableMap<Serializable, Pair<Option<T>, 
Map<String, Object>>> records;
+  protected final ExternalSpillableMap<Serializable, BufferedRecord<T>> 
records;
   protected final HoodieReadStats readStats;
   protected final boolean shouldCheckCustomDeleteMarker;
   protected final boolean shouldCheckBuiltInDeleteMarker;
   protected ClosableIterator<T> baseFileIterator;
-  protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
+  protected Iterator<BufferedRecord<T>> logRecordIterator;
   protected T nextRecord;
   protected boolean enablePartialMerging = false;
   protected InternalSchema internalSchema;
@@ -207,7 +204,7 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
   }
 
   @Override
-  public Map<Serializable, Pair<Option<T>, Map<String, Object>>> 
getLogRecords() {
+  public Map<Serializable, BufferedRecord<T>> getLogRecords() {
     return records;
   }
 
@@ -221,7 +218,7 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
   }
 
   @Override
-  public Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator() 
{
+  public Iterator<BufferedRecord<T>> getLogRecordIterator() {
     return records.values().iterator();
   }
 
@@ -234,28 +231,22 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
    * Merge two log data records if needed.
    *
    * @param newRecord                  The new incoming record
-   * @param metadata                   The metadata
-   * @param existingRecordMetadataPair The existing record metadata pair
-   *
-   * @return The pair of the record that needs to be updated with and its 
metadata,
-   * returns empty to skip the update.
+   * @param existingRecord             The existing record
+   * @return the {@link BufferedRecord} that needs to be updated, returns 
empty to skip the update.
    */
-  protected Option<Pair<Option<T>, Map<String, Object>>> 
doProcessNextDataRecord(T newRecord,
-                                                                               
  Map<String, Object> metadata,
-                                                                               
  Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair)
+  protected Option<BufferedRecord<T>> 
doProcessNextDataRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord)
       throws IOException {
     totalLogRecords++;
-    if (existingRecordMetadataPair != null) {
+    if (existingRecord != null) {
       if (enablePartialMerging) {
         // TODO(HUDI-7843): decouple the merging logic from the merger
         //  and use the record merge mode to control how to merge partial 
updates
         // Merge and store the combined record
         Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
recordMerger.get().partialMerge(
-            readerContext.constructHoodieRecord(
-                existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
-            
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
-            readerContext.constructHoodieRecord(Option.of(newRecord), 
metadata),
-            readerContext.getSchemaFromMetadata(metadata),
+            readerContext.constructHoodieRecord(existingRecord),
+            readerContext.getSchemaFromBufferRecord(existingRecord),
+            readerContext.constructHoodieRecord(newRecord),
+            readerContext.getSchemaFromBufferRecord(newRecord),
             readerSchema,
             props);
         if (!combinedRecordAndSchemaOpt.isPresent()) {
@@ -265,56 +256,49 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
         HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft();
 
         // If pre-combine returns existing record, no need to update it
-        if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().orElse(null)) {
-          return Option.of(Pair.of(
-              Option.ofNullable(combinedRecord.getData()),
-              
readerContext.updateSchemaAndResetOrderingValInMetadata(metadata, 
combinedRecordAndSchema.getRight())));
+        if (combinedRecord.getData() != existingRecord.getRecord()) {
+          return Option.of(BufferedRecord.forRecordWithContext(combinedRecord, 
combinedRecordAndSchema.getRight(), readerContext, props));
         }
         return Option.empty();
       } else {
         switch (recordMergeMode) {
           case COMMIT_TIME_ORDERING:
-            return Option.of(Pair.of(Option.ofNullable(newRecord), metadata));
+            return Option.of(newRecord);
           case EVENT_TIME_ORDERING:
-            if (shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata)) 
{
-              return Option.of(Pair.of(Option.of(newRecord), metadata));
+            if (shouldKeepNewerRecord(existingRecord, newRecord)) {
+              return Option.of(newRecord);
             }
             return Option.empty();
           case CUSTOM:
           default:
-            // Merge and store the combined record
-            if (payloadClass.isPresent()) {
-              if (existingRecordMetadataPair.getLeft().isEmpty()
-                  && 
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata)) 
{
+            if (existingRecord.isDelete() || newRecord.isDelete()) {
+              if (shouldKeepNewerRecord(existingRecord, newRecord)) {
                 // IMPORTANT:
                 // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
                 // return Option.empty when the old payload data is empty(a 
delete) and ignores its ordering value directly.
-                return Option.of(Pair.of(Option.of(newRecord), metadata));
+                return Option.of(newRecord);
+              } else {
+                return Option.empty();
               }
-              Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
-                  getMergedRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.of(newRecord), metadata);
+            }
+            // Merge and store the combined record
+            if (payloadClass.isPresent()) {
+              Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
getMergedRecord(existingRecord, newRecord);
               if (combinedRecordAndSchemaOpt.isPresent()) {
                 T combinedRecordData = 
readerContext.convertAvroRecord((IndexedRecord) 
combinedRecordAndSchemaOpt.get().getLeft().getData());
                 // If pre-combine does not return existing record, update it
-                if (combinedRecordData != 
existingRecordMetadataPair.getLeft().orElse(null)) {
-                  return 
Option.of(Pair.of(Option.ofNullable(combinedRecordData), metadata));
+                if (combinedRecordData != existingRecord.getRecord()) {
+                  Pair<HoodieRecord, Schema> combinedRecordAndSchema = 
combinedRecordAndSchemaOpt.get();
+                  return 
Option.of(BufferedRecord.forRecordWithContext(combinedRecordData, 
combinedRecordAndSchema.getRight(), readerContext, orderingFieldName, false));
                 }
               }
               return Option.empty();
             } else {
-              if (existingRecordMetadataPair.getLeft().isEmpty()
-                  && 
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.ofNullable(newRecord), metadata)) 
{
-                // IMPORTANT:
-                // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
-                // return Option.empty when the old payload data is empty(a 
delete) and ignores its ordering value directly.
-                return Option.of(Pair.of(Option.of(newRecord), metadata));
-              }
               Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
recordMerger.get().merge(
-                  readerContext.constructHoodieRecord(
-                      existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
-                  
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
-                  readerContext.constructHoodieRecord(Option.of(newRecord), 
metadata),
-                  readerContext.getSchemaFromMetadata(metadata),
+                  readerContext.constructHoodieRecord(existingRecord),
+                  readerContext.getSchemaFromBufferRecord(existingRecord),
+                  readerContext.constructHoodieRecord(newRecord),
+                  readerContext.getSchemaFromBufferRecord(newRecord),
                   props);
 
               if (!combinedRecordAndSchemaOpt.isPresent()) {
@@ -325,8 +309,8 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
               HoodieRecord<T> combinedRecord = 
combinedRecordAndSchema.getLeft();
 
               // If pre-combine returns existing record, no need to update it
-              if (combinedRecord.getData() != 
existingRecordMetadataPair.getLeft().orElse(null)) {
-                return 
Option.of(Pair.of(Option.ofNullable(combinedRecord.getData()), metadata));
+              if (combinedRecord.getData() != existingRecord.getRecord()) {
+                return 
Option.of(BufferedRecord.forRecordWithContext(combinedRecord, 
combinedRecordAndSchema.getRight(), readerContext, props));
               }
               return Option.empty();
             }
@@ -337,7 +321,7 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
       // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
       //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
       //       it since these records will be put into records(Map).
-      return Option.of(Pair.of(Option.ofNullable(newRecord), metadata));
+      return Option.of(newRecord);
     }
   }
 
@@ -345,26 +329,23 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
    * Merge a delete record with another record (data, or delete).
    *
    * @param deleteRecord               The delete record
-   * @param existingRecordMetadataPair The existing record metadata pair
+   * @param existingRecord             The existing {@link BufferedRecord}
    *
    * @return The option of new delete record that needs to be updated with.
    */
-  protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord 
deleteRecord,
-                                                            Pair<Option<T>, 
Map<String, Object>> existingRecordMetadataPair) {
+  protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord 
deleteRecord, BufferedRecord<T> existingRecord) {
     totalLogRecords++;
-    if (existingRecordMetadataPair != null) {
+    if (existingRecord != null) {
       switch (recordMergeMode) {
         case COMMIT_TIME_ORDERING:
           return Option.of(deleteRecord);
         case EVENT_TIME_ORDERING:
         case CUSTOM:
         default:
-          Comparable existingOrderingVal = readerContext.getOrderingValue(
-              existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), readerSchema,
-              orderingFieldName);
-          if 
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(), 
existingOrderingVal)) {
+          if (existingRecord.isCommitTimeOrderingDelete()) {
             return Option.empty();
           }
+          Comparable existingOrderingVal = existingRecord.getOrderingValue();
           Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
           // Checks the ordering value does not equal to 0
           // because we use 0 as the default value which means natural order
@@ -432,68 +413,60 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
   /**
    * Merge two records using the configured record merger.
    *
-   * @param older
-   * @param olderInfoMap
-   * @param newer
-   * @param newerInfoMap
-   * @return
+   * @param olderRecord  old {@link BufferedRecord}
+   * @param newerRecord  newer {@link BufferedRecord}
+   * @return a value pair, left is boolean value `isDelete`, and right is 
engine row.
    * @throws IOException
    */
-  protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
-                            Option<T> newer, Map<String, Object> newerInfoMap) 
throws IOException {
-    if (!older.isPresent()) {
-      return isDeleteRecord(newer, 
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
-    }
-
+  protected Pair<Boolean, T> merge(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
     if (enablePartialMerging) {
       // TODO(HUDI-7843): decouple the merging logic from the merger
       //  and use the record merge mode to control how to merge partial updates
       Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().partialMerge(
-          readerContext.constructHoodieRecord(older, olderInfoMap), 
readerContext.getSchemaFromMetadata(olderInfoMap),
-          readerContext.constructHoodieRecord(newer, newerInfoMap), 
readerContext.getSchemaFromMetadata(newerInfoMap),
+          readerContext.constructHoodieRecord(olderRecord), 
readerContext.getSchemaFromBufferRecord(olderRecord),
+          readerContext.constructHoodieRecord(newerRecord), 
readerContext.getSchemaFromBufferRecord(newerRecord),
           readerSchema, props);
 
       if (mergedRecord.isPresent()
           && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+        HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
         if (!mergedRecord.get().getRight().equals(readerSchema)) {
-          return Option.ofNullable((T) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData());
+          T data = (T) 
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null, 
readerSchema).getData();
+          return Pair.of(false, data);
         }
-        return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
+        return Pair.of(false, (T) hoodieRecord.getData());
       }
-      return Option.empty();
+      return Pair.of(true, null);
     } else {
       switch (recordMergeMode) {
         case COMMIT_TIME_ORDERING:
-          return isDeleteRecord(newer, 
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
+          return Pair.of(newerRecord.isDelete(), newerRecord.getRecord());
         case EVENT_TIME_ORDERING:
-          Comparable newOrderingValue = readerContext.getOrderingValue(
-              newer, newerInfoMap, readerSchema, orderingFieldName);
-          if (isDeleteRecordWithNaturalOrder(newer, newOrderingValue)) {
-            return Option.empty();
+          if (newerRecord.isCommitTimeOrderingDelete()) {
+            return Pair.of(true, newerRecord.getRecord());
           }
-          Comparable oldOrderingValue = readerContext.getOrderingValue(
-              older, olderInfoMap, readerSchema, orderingFieldName);
-          if (!isDeleteRecordWithNaturalOrder(older, oldOrderingValue)
+          Comparable newOrderingValue = newerRecord.getOrderingValue();
+          Comparable oldOrderingValue = olderRecord.getOrderingValue();
+          if (!olderRecord.isCommitTimeOrderingDelete()
               && oldOrderingValue.compareTo(newOrderingValue) > 0) {
-            return isDeleteRecord(older, 
readerContext.getSchemaFromMetadata(olderInfoMap)) ? Option.empty() : older;
+            return Pair.of(olderRecord.isDelete(), olderRecord.getRecord());
           }
-          return isDeleteRecord(newer, 
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
+          return Pair.of(newerRecord.isDelete(), newerRecord.getRecord());
         case CUSTOM:
         default:
           if (payloadClass.isPresent()) {
-            if (older.isEmpty() || newer.isEmpty()) {
-              if (shouldKeepNewerRecord(older, olderInfoMap, newer, 
newerInfoMap)) {
+            if (olderRecord.isDelete() || newerRecord.isDelete()) {
+              if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
                 // IMPORTANT:
                 // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
                 // return Option.empty when the new payload data is empty(a 
delete) and ignores its ordering value directly.
-                return newer;
+                return Pair.of(newerRecord.isDelete(), 
newerRecord.getRecord());
               } else {
-                return older;
+                return Pair.of(olderRecord.isDelete(), 
olderRecord.getRecord());
               }
             }
-
             Option<Pair<HoodieRecord, Schema>> mergedRecord =
-                getMergedRecord(older, olderInfoMap, newer, newerInfoMap);
+                getMergedRecord(olderRecord, newerRecord);
             if (mergedRecord.isPresent()
                 && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
               IndexedRecord indexedRecord;
@@ -502,32 +475,32 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
               } else {
                 indexedRecord = (IndexedRecord) 
mergedRecord.get().getLeft().getData();
               }
-              return 
Option.ofNullable(readerContext.convertAvroRecord(indexedRecord));
+              return Pair.of(false, 
readerContext.convertAvroRecord(indexedRecord));
             }
-            return Option.empty();
+            return Pair.of(true, null);
           } else {
-            if (older.isEmpty() || newer.isEmpty()) {
-              if (shouldKeepNewerRecord(older, olderInfoMap, newer, 
newerInfoMap)) {
+            if (olderRecord.isDelete() || newerRecord.isDelete()) {
+              if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
                 // IMPORTANT:
                 // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
                 // return Option.empty when the new payload data is empty(a 
delete) and ignores its ordering value directly.
-                return newer;
+                return Pair.of(newerRecord.isDelete(), 
newerRecord.getRecord());
               } else {
-                return older;
+                return Pair.of(olderRecord.isDelete(), 
olderRecord.getRecord());
               }
             }
             Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().merge(
-                readerContext.constructHoodieRecord(older, olderInfoMap), 
readerContext.getSchemaFromMetadata(olderInfoMap),
-                readerContext.constructHoodieRecord(newer, newerInfoMap), 
readerContext.getSchemaFromMetadata(newerInfoMap), props);
+                readerContext.constructHoodieRecord(olderRecord), 
readerContext.getSchemaFromBufferRecord(olderRecord),
+                readerContext.constructHoodieRecord(newerRecord), 
readerContext.getSchemaFromBufferRecord(newerRecord), props);
             if (mergedRecord.isPresent()
                 && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+              HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
               if (!mergedRecord.get().getRight().equals(readerSchema)) {
-                return Option.ofNullable((T) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData());
+                return Pair.of(false, (T) 
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null, 
readerSchema).getData());
               }
-              return Option.ofNullable((T) 
mergedRecord.get().getLeft().getData());
+              return Pair.of(false, (T) hoodieRecord.getData());
             }
-
-            return Option.empty();
+            return Pair.of(true, null);
           }
       }
     }
@@ -536,24 +509,22 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
   /**
    * Decides whether to keep the incoming record with ordering value 
comparison.
    */
-  private boolean shouldKeepNewerRecord(Option<T> oldVal, Map<String, Object> 
oldMetadata, Option<T> newVal, Map<String, Object> newMetadata) {
-    Comparable newOrderingVal = readerContext.getOrderingValue(newVal, 
newMetadata, readerSchema, orderingFieldName);
-    if (isDeleteRecordWithNaturalOrder(newVal, newOrderingVal)) {
+  private boolean shouldKeepNewerRecord(BufferedRecord<T> oldRecord, 
BufferedRecord<T> newRecord) {
+    if (newRecord.isCommitTimeOrderingDelete()) {
       // handle records coming from DELETE statements(the orderingVal is 
constant 0)
       return true;
     }
-    Comparable oldOrderingVal = readerContext.getOrderingValue(oldVal, 
oldMetadata, readerSchema, orderingFieldName);
-    return newOrderingVal.compareTo(oldOrderingVal) >= 0;
+    return 
newRecord.getOrderingValue().compareTo(oldRecord.getOrderingValue()) >= 0;
   }
 
-  private Option<Pair<HoodieRecord, Schema>> getMergedRecord(Option<T> older, 
Map<String, Object> olderInfoMap, Option<T> newer, Map<String, Object> 
newerInfoMap) throws IOException {
+  private Option<Pair<HoodieRecord, Schema>> getMergedRecord(BufferedRecord<T> 
olderRecord, BufferedRecord<T> newerRecord) throws IOException {
     ValidationUtils.checkArgument(!Objects.equals(payloadClass, 
OverwriteWithLatestAvroPayload.class.getCanonicalName())
         && !Objects.equals(payloadClass, 
DefaultHoodieRecordPayload.class.getCanonicalName()));
-    HoodieRecord oldHoodieRecord = constructHoodieAvroRecord(readerContext, 
older, olderInfoMap);
-    HoodieRecord newHoodieRecord = constructHoodieAvroRecord(readerContext, 
newer, newerInfoMap);
+    HoodieRecord oldHoodieRecord = constructHoodieAvroRecord(readerContext, 
olderRecord);
+    HoodieRecord newHoodieRecord = constructHoodieAvroRecord(readerContext, 
newerRecord);
     Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.get().merge(
-        oldHoodieRecord, getSchemaForAvroPayloadMerge(oldHoodieRecord, 
olderInfoMap),
-        newHoodieRecord, getSchemaForAvroPayloadMerge(newHoodieRecord, 
newerInfoMap), props);
+        oldHoodieRecord, getSchemaForAvroPayloadMerge(oldHoodieRecord, 
olderRecord),
+        newHoodieRecord, getSchemaForAvroPayloadMerge(newHoodieRecord, 
newerRecord), props);
     return mergedRecord;
   }
 
@@ -561,38 +532,34 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
    * Constructs a new {@link HoodieAvroRecord} for payload based merging
    *
    * @param readerContext reader context
-   * @param recordOption An option of the record in engine-specific type if 
exists.
-   * @param metadataMap  The record metadata.
+   * @param bufferedRecord buffered record
    * @return A new instance of {@link HoodieRecord}.
    */
-  private HoodieRecord constructHoodieAvroRecord(HoodieReaderContext<T> 
readerContext, Option<T> recordOption, Map<String, Object> metadataMap) {
-    Schema recordSchema = readerSchema;
+  private HoodieRecord constructHoodieAvroRecord(HoodieReaderContext<T> 
readerContext, BufferedRecord<T> bufferedRecord) {
     GenericRecord record = null;
-    if (recordOption.isPresent()) {
-      recordSchema = readerContext.getSchemaFromMetadata(metadataMap);
-      record = readerContext.convertToAvroRecord(recordOption.get(), 
recordSchema);
+    if (!bufferedRecord.isDelete()) {
+      Schema recordSchema = 
readerContext.getSchemaFromBufferRecord(bufferedRecord);
+      record = readerContext.convertToAvroRecord(bufferedRecord.getRecord(), 
recordSchema);
     }
-    HoodieKey hoodieKey = new HoodieKey((String) 
metadataMap.get(INTERNAL_META_RECORD_KEY), (String) 
metadataMap.get(INTERNAL_META_PARTITION_PATH));
+    HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), null);
     return new HoodieAvroRecord<>(hoodieKey,
-        HoodieRecordUtils.loadPayload(payloadClass.get(), record, 
readerContext.getOrderingValue(recordOption, metadataMap,
-            recordSchema, orderingFieldName)), null);
+        HoodieRecordUtils.loadPayload(payloadClass.get(), record, 
bufferedRecord.getOrderingValue()), null);
   }
 
-  private Schema getSchemaForAvroPayloadMerge(HoodieRecord record, Map<String, 
Object> infoMap) throws IOException {
+  private Schema getSchemaForAvroPayloadMerge(HoodieRecord record, 
BufferedRecord<T> bufferedRecord) throws IOException {
     if (record.isDelete(readerSchema, props)) {
       return readerSchema;
     }
-    return readerContext.getSchemaFromMetadata(infoMap);
+    return readerContext.getSchemaFromBufferRecord(bufferedRecord);
   }
 
-  protected boolean hasNextBaseRecord(T baseRecord, Pair<Option<T>, 
Map<String, Object>> logRecordInfo) throws IOException {
-    Map<String, Object> metadata = 
readerContext.generateMetadataForRecord(baseRecord, readerSchema);
-
+  protected boolean hasNextBaseRecord(T baseRecord, BufferedRecord<T> 
logRecordInfo) throws IOException {
     if (logRecordInfo != null) {
-      Option<T> resultRecord = merge(Option.of(baseRecord), metadata, 
logRecordInfo.getLeft(), logRecordInfo.getRight());
-      if (resultRecord.isPresent()) {
+      BufferedRecord<T> bufferedRecord = 
BufferedRecord.forRecordWithContext(baseRecord, readerSchema, readerContext, 
orderingFieldName, false);
+      Pair<Boolean, T> isDeleteAndRecord = merge(bufferedRecord, 
logRecordInfo);
+      if (!isDeleteAndRecord.getLeft()) {
         // Updates
-        nextRecord = readerContext.seal(resultRecord.get());
+        nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
         readStats.incrementNumUpdates();
         return true;
       } else {
@@ -608,18 +575,15 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
     return true;
   }
 
-  protected boolean hasNextLogRecord() throws IOException {
+  protected boolean hasNextLogRecord() {
     if (logRecordIterator == null) {
       logRecordIterator = records.values().iterator();
     }
 
     while (logRecordIterator.hasNext()) {
-      Pair<Option<T>, Map<String, Object>> nextRecordInfo = 
logRecordIterator.next();
-      Option<T> resultRecord;
-      resultRecord = merge(Option.empty(), Collections.emptyMap(),
-          nextRecordInfo.getLeft(), nextRecordInfo.getRight());
-      if (resultRecord.isPresent()) {
-        nextRecord = readerContext.seal(resultRecord.get());
+      BufferedRecord<T> nextRecordInfo = logRecordIterator.next();
+      if (!nextRecordInfo.isDelete()) {
+        nextRecord = nextRecordInfo.getRecord();
         readStats.incrementNumInserts();
         return true;
       } else {
@@ -655,11 +619,6 @@ public abstract class FileGroupRecordBuffer<T> implements 
HoodieFileGroupRecordB
         : 
readerContext.convertValueToEngineType(deleteRecord.getOrderingValue());
   }
 
-  private boolean isDeleteRecordWithNaturalOrder(Option<T> rowOption,
-                                                 Comparable orderingValue) {
-    return rowOption.isEmpty() && orderingValue.equals(DEFAULT_ORDERING_VALUE);
-  }
-
   private boolean isDeleteRecord(Option<T> record, Schema schema) {
     if (record.isEmpty()) {
       return true;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index a7b9423be2e..cc6930c7a39 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.Pair;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -59,11 +58,11 @@ public interface HoodieFileGroupRecordBuffer<T> {
   /**
    * Process a next record in a log data block.
    *
-   * @param record
-   * @param metadata
+   * @param record Buffered record
+   * @param index  Record key or position
    * @throws Exception
    */
-  void processNextDataRecord(T record, Map<String, Object> metadata, 
Serializable index) throws IOException;
+  void processNextDataRecord(BufferedRecord<T> record, Serializable index) 
throws IOException;
 
   /**
    * Process a log delete block, and store the resulting records into the 
buffer.
@@ -75,10 +74,8 @@ public interface HoodieFileGroupRecordBuffer<T> {
 
   /**
    * Process next delete record.
-   *
-   * @param deleteRecord
    */
-  void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable index);
+  void processNextDeletedRecord(DeleteRecord record, Serializable index);
 
   /**
    * Check if a record exists in the buffered records.
@@ -98,12 +95,12 @@ public interface HoodieFileGroupRecordBuffer<T> {
   /**
    * @return An iterator on the log records.
    */
-  Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator();
+  Iterator<BufferedRecord<T>> getLogRecordIterator();
 
   /**
    * @return The underlying data stored in the buffer.
    */
-  Map<Serializable, Pair<Option<T>, Map<String, Object>>> getLogRecords();
+  Map<Serializable, BufferedRecord<T>> getLogRecords();
 
   /**
    * Link the base file iterator for consequential merge.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
index d19519a3f86..7a0e1aacd22 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.DeleteRecord;
-import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.KeySpec;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -40,9 +39,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Iterator;
-import java.util.Map;
-
-import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
 
 /**
  * A buffer that is used to store log records by {@link 
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
@@ -82,31 +78,20 @@ public class KeyBasedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
     try (ClosableIterator<T> recordIterator = 
recordsIteratorSchemaPair.getLeft()) {
       while (recordIterator.hasNext()) {
         T nextRecord = recordIterator.next();
-        Map<String, Object> metadata = readerContext.generateMetadataForRecord(
-            nextRecord, schema);
-        String recordKey = (String) 
metadata.get(HoodieReaderContext.INTERNAL_META_RECORD_KEY);
-
-        if (isBuiltInDeleteRecord(nextRecord) || 
isCustomDeleteRecord(nextRecord)) {
-          processDeleteRecord(nextRecord, metadata);
-        } else {
-          processNextDataRecord(nextRecord, metadata, recordKey);
-        }
+        boolean isDelete = isBuiltInDeleteRecord(nextRecord) || 
isCustomDeleteRecord(nextRecord);
+        BufferedRecord<T> bufferedRecord = 
BufferedRecord.forRecordWithContext(nextRecord, schema, readerContext, 
orderingFieldName, isDelete);
+        processNextDataRecord(bufferedRecord, bufferedRecord.getRecordKey());
       }
     }
   }
 
   @Override
-  public void processNextDataRecord(T record, Map<String, Object> metadata, 
Serializable recordKey) throws IOException {
-    Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordKey);
-    Option<Pair<Option<T>, Map<String, Object>>> mergedRecordAndMetadata =
-        doProcessNextDataRecord(record, metadata, existingRecordMetadataPair);
-
-    if (mergedRecordAndMetadata.isPresent()) {
-      records.put(recordKey, Pair.of(
-          mergedRecordAndMetadata.get().getLeft().isPresent()
-              ? 
Option.ofNullable(readerContext.seal(mergedRecordAndMetadata.get().getLeft().get()))
-              : Option.empty(),
-          mergedRecordAndMetadata.get().getRight()));
+  public void processNextDataRecord(BufferedRecord<T> record, Serializable 
recordKey) throws IOException {
+    BufferedRecord<T> existingRecord = records.get(recordKey);
+    Option<BufferedRecord<T>> bufferRecord = doProcessNextDataRecord(record, 
existingRecord);
+
+    if (bufferRecord.isPresent()) {
+      records.put(recordKey, bufferRecord.get().toBinary(readerContext));
     }
   }
 
@@ -115,36 +100,20 @@ public class KeyBasedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
     Iterator<DeleteRecord> it = 
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
     while (it.hasNext()) {
       DeleteRecord record = it.next();
-      String recordKey = record.getRecordKey();
-      processNextDeletedRecord(record, recordKey);
+      processNextDeletedRecord(record, record.getRecordKey());
     }
   }
 
   @Override
   public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable 
recordKey) {
-    Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordKey);
-    Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord, 
existingRecordMetadataPair);
+    BufferedRecord<T> existingRecord = records.get(recordKey);
+    Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord, 
existingRecord);
     if (recordOpt.isPresent()) {
-      records.put(recordKey, Pair.of(Option.empty(), 
readerContext.generateMetadataForRecord(
-          (String) recordKey, recordOpt.get().getPartitionPath(),
-          getOrderingValue(readerContext, recordOpt.get()))));
+      Comparable orderingValue = getOrderingValue(readerContext, 
recordOpt.get());
+      records.put(recordKey, BufferedRecord.forDeleteRecord(deleteRecord, 
orderingValue));
     }
   }
 
-  protected void processDeleteRecord(T record, Map<String, Object> metadata) {
-    DeleteRecord deleteRecord = DeleteRecord.create(
-        new HoodieKey(
-            (String) metadata.get(INTERNAL_META_RECORD_KEY),
-            // The partition path of the delete record is set to null because 
it is not
-            // used, and the delete record is never surfaced from the file 
group reader
-            null),
-        readerContext.getOrderingValue(
-            Option.of(record), metadata, readerSchema, orderingFieldName));
-    processNextDeletedRecord(
-        deleteRecord,
-        (String) metadata.get(INTERNAL_META_RECORD_KEY));
-  }
-
   @Override
   public boolean containsLogRecord(String recordKey) {
     return records.containsKey(recordKey);
@@ -152,7 +121,7 @@ public class KeyBasedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
 
   protected boolean hasNextBaseRecord(T baseRecord) throws IOException {
     String recordKey = readerContext.getRecordKey(baseRecord, readerSchema);
-    Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(recordKey);
+    BufferedRecord<T> logRecordInfo = records.remove(recordKey);
     return hasNextBaseRecord(baseRecord, logRecordInfo);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
index 6b2f4d2c656..40de6a172cf 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.DeleteRecord;
-import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.KeySpec;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -45,16 +44,12 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 
-import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
-
 /**
  * A buffer that is used to store log records by {@link 
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
  * by calling the {@link #processDataBlock} and {@link #processDeleteBlock} 
methods into record position based map.
@@ -136,12 +131,9 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
 
         long recordPosition = recordPositions.get(recordIndex++);
         T evolvedNextRecord = 
schemaTransformerWithEvolvedSchema.getLeft().apply(nextRecord);
-        Map<String, Object> metadata = 
readerContext.generateMetadataForRecord(evolvedNextRecord, schema);
-        if (isBuiltInDeleteRecord(evolvedNextRecord) || 
isCustomDeleteRecord(evolvedNextRecord)) {
-          processDeleteRecord(evolvedNextRecord, metadata, recordPosition);
-        } else {
-          processNextDataRecord(evolvedNextRecord, metadata, recordPosition);
-        }
+        boolean isDelete = isBuiltInDeleteRecord(evolvedNextRecord) || 
isCustomDeleteRecord(evolvedNextRecord);
+        BufferedRecord<T> bufferedRecord = 
BufferedRecord.forRecordWithContext(evolvedNextRecord, schema, readerContext, 
orderingFieldName, isDelete);
+        processNextDataRecord(bufferedRecord, recordPosition);
       }
     }
   }
@@ -151,11 +143,11 @@ public class PositionBasedFileGroupRecordBuffer<T> 
extends KeyBasedFileGroupReco
     //need to make a copy of the keys to avoid concurrent modification 
exception
     ArrayList<Serializable> positions = new ArrayList<>(records.keySet());
     for (Serializable position : positions) {
-      Pair<Option<T>, Map<String, Object>> entry = records.get(position);
-      Object recordKey = entry.getRight().get(INTERNAL_META_RECORD_KEY);
-      if (entry.getLeft().isPresent() || recordKey != null) {
+      BufferedRecord<T> entry = records.get(position);
+      String recordKey = entry.getRecordKey();
+      if (!entry.isDelete() || recordKey != null) {
 
-        records.put((String) recordKey, entry);
+        records.put(recordKey, entry);
         records.remove(position);
       } else {
         //if it's a delete record and the key is null, then we need to still 
use positions
@@ -196,9 +188,8 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
           // this delete-vector could be kept in the records cache(see the 
check in #fallbackToKeyBasedBuffer),
           // and these keys would be deleted no matter whether there are 
following-up inserts/updates.
           DeleteRecord deleteRecord = 
deleteRecords[commitTimeBasedRecordIndex++];
-          records.put(recordPosition,
-              Pair.of(Option.empty(), readerContext.generateMetadataForRecord(
-                  deleteRecord.getRecordKey(), "", 
deleteRecord.getOrderingValue())));
+          BufferedRecord<T> record = 
BufferedRecord.forDeleteRecord(deleteRecord, deleteRecord.getOrderingValue());
+          records.put(recordPosition, record);
         }
         return;
       case EVENT_TIME_ORDERING:
@@ -214,34 +205,21 @@ public class PositionBasedFileGroupRecordBuffer<T> 
extends KeyBasedFileGroupReco
     }
   }
 
-  protected void processDeleteRecord(T record, Map<String, Object> metadata, 
long recordPosition) {
-    DeleteRecord deleteRecord = DeleteRecord.create(
-        new HoodieKey(
-            // The partition path of the delete record is set to null because 
it is not
-            // used, and the delete record is never surfaced from the file 
group reader
-            (String) metadata.get(INTERNAL_META_RECORD_KEY), null),
-        readerContext.getOrderingValue(
-            Option.of(record), metadata, readerSchema, orderingFieldName));
-    processNextDeletedRecord(deleteRecord, recordPosition);
-  }
-
   @Override
   public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable 
recordPosition) {
-    Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair = 
records.get(recordPosition);
-    Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord, 
existingRecordMetadataPair);
+    BufferedRecord<T> existingRecord = records.get(recordPosition);
+    Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord, 
existingRecord);
     if (recordOpt.isPresent()) {
-      String recordKey = recordOpt.get().getRecordKey();
-      records.put(recordPosition, Pair.of(Option.empty(), 
readerContext.generateMetadataForRecord(
-          recordKey, recordOpt.get().getPartitionPath(),
-          getOrderingValue(readerContext, recordOpt.get()))));
+      Comparable orderingValue = getOrderingValue(readerContext, 
recordOpt.get());
+      records.put(recordPosition, 
BufferedRecord.forDeleteRecord(recordOpt.get(), orderingValue));
     }
   }
 
   @Override
   public boolean containsLogRecord(String recordKey) {
     return records.values().stream()
-        .filter(r -> r.getLeft().isPresent())
-        .map(r -> readerContext.getRecordKey(r.getKey().get(), 
readerSchema)).anyMatch(recordKey::equals);
+        .filter(r -> !r.isDelete())
+        .map(r -> readerContext.getRecordKey(r.getRecord(), 
readerSchema)).anyMatch(recordKey::equals);
   }
 
   @Override
@@ -252,27 +230,26 @@ public class PositionBasedFileGroupRecordBuffer<T> 
extends KeyBasedFileGroupReco
 
     nextRecordPosition = readerContext.extractRecordPosition(baseRecord, 
readerSchema,
         ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
-    Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(nextRecordPosition++);
-
-    Map<String, Object> metadata = readerContext.generateMetadataForRecord(
-        baseRecord, readerSchema);
+    BufferedRecord<T> logRecordInfo = records.remove(nextRecordPosition++);
 
-    final Option<T> resultRecord;
+    final Pair<Boolean, T> isDeleteAndRecord;
+    T resultRecord = null;
     if (logRecordInfo != null) {
-      resultRecord = merge(
-          Option.of(baseRecord), metadata, logRecordInfo.getLeft(), 
logRecordInfo.getRight());
-      if (resultRecord.isPresent()) {
+      BufferedRecord<T> bufferedRecord = 
BufferedRecord.forRecordWithContext(baseRecord, readerSchema, readerContext, 
orderingFieldName, false);
+      isDeleteAndRecord = merge(bufferedRecord, logRecordInfo);
+      if (!isDeleteAndRecord.getLeft()) {
+        resultRecord = isDeleteAndRecord.getRight();
         readStats.incrementNumUpdates();
       } else {
         readStats.incrementNumDeletes();
       }
     } else {
-      resultRecord = merge(Option.empty(), Collections.emptyMap(), 
Option.of(baseRecord), metadata);
+      resultRecord = baseRecord;
       readStats.incrementNumInserts();
     }
 
-    if (resultRecord.isPresent()) {
-      nextRecord = readerContext.seal(resultRecord.get());
+    if (resultRecord != null) {
+      nextRecord = readerContext.seal(resultRecord);
       return true;
     }
     return false;
@@ -283,7 +260,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends 
KeyBasedFileGroupReco
       //see if there is a delete block with record positions
       nextRecordPosition = readerContext.extractRecordPosition(baseRecord, 
readerSchema,
           ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
-      Pair<Option<T>, Map<String, Object>> logRecordInfo  = 
records.remove(nextRecordPosition++);
+      BufferedRecord<T> logRecordInfo  = records.remove(nextRecordPosition++);
       if (logRecordInfo != null) {
         //we have a delete that was not to be able to be converted. Since it 
is the newest version, the record is deleted
         //remove a key based record if it exists
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
index 7874b994c2a..91e5e06cbfd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
@@ -39,7 +39,6 @@ import org.apache.avro.Schema;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Iterator;
-import java.util.Map;
 
 public class UnmergedFileGroupRecordBuffer<T> extends FileGroupRecordBuffer<T> 
{
   // Used to order the records in the record map.
@@ -69,14 +68,14 @@ public class UnmergedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
 
     // Output records based on the index to preserve the order.
     if (!records.isEmpty()) {
-      Pair<Option<T>, Map<String, Object>> nextRecordInfo = 
records.remove(getIndex++);
+      BufferedRecord<T> nextRecordInfo = records.remove(getIndex++);
 
       if (nextRecordInfo == null) {
         throw new HoodieException("Row index should be continuous!");
       }
 
-      if (nextRecordInfo.getLeft().isPresent()) {
-        nextRecord = nextRecordInfo.getKey().get();
+      if (!nextRecordInfo.isDelete()) {
+        nextRecord = nextRecordInfo.getRecord();
       } else {
         throw new IllegalStateException("No deletes should exist in unmerged 
reading mode");
       }
@@ -87,7 +86,7 @@ public class UnmergedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
   }
 
   @Override
-  public Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator() 
{
+  public Iterator<BufferedRecord<T>> getLogRecordIterator() {
     return records.values().iterator();
   }
 
@@ -109,16 +108,15 @@ public class UnmergedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
     try (ClosableIterator<T> recordIterator = 
recordsIteratorSchemaPair.getLeft()) {
       while (recordIterator.hasNext()) {
         T nextRecord = recordIterator.next();
-        Map<String, Object> metadata = readerContext.generateMetadataForRecord(
-            nextRecord, schema);
-        processNextDataRecord(nextRecord, metadata, putIndex++);
+        BufferedRecord<T> bufferedRecord = 
BufferedRecord.forRecordWithContext(nextRecord, schema, readerContext, 
orderingFieldName, false);
+        processNextDataRecord(bufferedRecord, putIndex++);
       }
     }
   }
 
   @Override
-  public void processNextDataRecord(T record, Map<String, Object> metadata, 
Serializable index) {
-    records.put(index, Pair.of(Option.ofNullable(readerContext.seal(record)), 
metadata));
+  public void processNextDataRecord(BufferedRecord<T> record, Serializable 
index) {
+    records.put(index, record.toBinary(readerContext));
   }
 
   @Override
@@ -129,9 +127,7 @@ public class UnmergedFileGroupRecordBuffer<T> extends 
FileGroupRecordBuffer<T> {
   @Override
   public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable 
index) {
     // never used for now
-    records.put(index, Pair.of(Option.empty(), 
readerContext.generateMetadataForRecord(
-        deleteRecord.getRecordKey(), deleteRecord.getPartitionPath(),
-        getOrderingValue(readerContext, deleteRecord))));
+    records.put(index, BufferedRecord.forDeleteRecord(deleteRecord, 
getOrderingValue(readerContext, deleteRecord)));
   }
 
   @Override
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
index db3846b459b..daac251d187 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
@@ -44,21 +44,20 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
-import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
 import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
 import static 
org.apache.hudi.common.table.read.FileGroupRecordBuffer.getOrderingValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -315,7 +314,7 @@ class TestFileGroupRecordBuffer {
   }
 
   @Test
-  void testProcessCustomDeleteRecord() {
+  void testProcessCustomDeleteRecord() throws IOException {
     String customDeleteKey = "op";
     String customDeleteValue = "d";
     when(schemaHandler.getCustomDeleteMarkerKeyValue())
@@ -337,17 +336,16 @@ class TestFileGroupRecordBuffer {
     record.put("ts", System.currentTimeMillis());
     record.put("op", "d");
     record.put("_hoodie_is_deleted", false);
+    when(readerContext.getOrderingValue(any(), any(), any())).thenReturn(1);
+    when(readerContext.convertValueToEngineType(any())).thenReturn(1);
+    BufferedRecord<GenericRecord> bufferedRecord = 
BufferedRecord.forRecordWithContext(record, schema, readerContext, 
Option.of("ts"), true);
 
-    Map<String, Object> metadata = new HashMap<>();
-    metadata.put(INTERNAL_META_RECORD_KEY, "12345");
-    metadata.put(INTERNAL_META_PARTITION_PATH, "partition1");
-    when(readerContext.getOrderingValue(any(), any(), any(), 
any())).thenReturn(1);
-    when(readerContext.generateMetadataForRecord(any(), any(), 
any())).thenReturn(metadata);
-    keyBasedBuffer.processDeleteRecord(record, metadata);
-    Map<Serializable, Pair<Option<GenericRecord>, Map<String, Object>>> 
records =
-        keyBasedBuffer.getLogRecords();
+    keyBasedBuffer.processNextDataRecord(bufferedRecord, "12345");
+    Map<Serializable, BufferedRecord<GenericRecord>> records = 
keyBasedBuffer.getLogRecords();
     assertEquals(1, records.size());
-    assertEquals(Pair.of(Option.empty(), metadata), records.get("12345"));
+    BufferedRecord<GenericRecord> deleteRecord = records.get("12345");
+    assertNull(deleteRecord.getRecordKey(), "The record key metadata field is 
missing");
+    assertEquals(1, deleteRecord.getOrderingValue());
 
     // CASE 2: With _hoodie_is_deleted is true.
     GenericRecord anotherRecord = new GenericData.Record(schema);
@@ -355,14 +353,13 @@ class TestFileGroupRecordBuffer {
     anotherRecord.put("ts", System.currentTimeMillis());
     anotherRecord.put("op", "i");
     anotherRecord.put("_hoodie_is_deleted", true);
+    bufferedRecord = BufferedRecord.forRecordWithContext(anotherRecord, 
schema, readerContext, Option.of("ts"), true);
 
-    Map<String, Object> anotherMetadata = new HashMap<>();
-    anotherMetadata.put(INTERNAL_META_RECORD_KEY, "54321");
-    anotherMetadata.put(INTERNAL_META_PARTITION_PATH, "partition2");
-    when(readerContext.generateMetadataForRecord(any(), any(), 
any())).thenReturn(anotherMetadata);
-    keyBasedBuffer.processDeleteRecord(anotherRecord, anotherMetadata);
+    keyBasedBuffer.processNextDataRecord(bufferedRecord, "54321");
     records = keyBasedBuffer.getLogRecords();
     assertEquals(2, records.size());
-    assertEquals(Pair.of(Option.empty(), anotherMetadata), 
records.get("54321"));
+    deleteRecord = records.get("54321");
+    assertNull(deleteRecord.getRecordKey(), "The record key metadata field is 
missing");
+    assertEquals(1, deleteRecord.getOrderingValue());
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 10f401b20b9..c5a3ba02f69 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -70,9 +70,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_ORDERING_FIELD;
-import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
-import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
@@ -193,27 +190,19 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       List<T> records = readRecordsFromFileGroup(getStorageConf(), 
getBasePath(), metaClient, fileSlices,
           avroSchema, RecordMergeMode.COMMIT_TIME_ORDERING, false);
       HoodieReaderContext<T> readerContext = 
getHoodieReaderContext(getBasePath(), avroSchema, getStorageConf(), metaClient);
-      Comparable orderingFieldValue = "100";
       for (Boolean isCompressionEnabled : new boolean[] {true, false}) {
-        try (ExternalSpillableMap<Serializable, Pair<Option<T>, Map<String, 
Object>>> spillableMap =
+        try (ExternalSpillableMap<Serializable, BufferedRecord<T>> 
spillableMap =
                  new ExternalSpillableMap<>(16L, baseMapPath, new 
DefaultSizeEstimator(),
                      new HoodieRecordSizeEstimator(avroSchema), diskMapType, 
new DefaultSerializer<>(), isCompressionEnabled, getClass().getSimpleName())) {
           Long position = 0L;
           for (T record : records) {
             String recordKey = readerContext.getRecordKey(record, avroSchema);
             //test key based
-            spillableMap.put(recordKey,
-                Pair.of(
-                    Option.ofNullable(readerContext.seal(record)),
-                    readerContext.generateMetadataForRecord(record, 
avroSchema)));
+            BufferedRecord<T> bufferedRecord = 
BufferedRecord.forRecordWithContext(record, avroSchema, readerContext, 
Option.of("timestamp"), false);
+            spillableMap.put(recordKey, 
bufferedRecord.toBinary(readerContext));
 
             //test position based
-            spillableMap.put(position++,
-                Pair.of(
-                    Option.ofNullable(readerContext.seal(record)),
-                    readerContext.generateMetadataForRecord(
-                        recordKey, dataGen.getPartitionPaths()[0],
-                        
readerContext.convertValueToEngineType(orderingFieldValue))));
+            spillableMap.put(position++, 
bufferedRecord.toBinary(readerContext));
           }
 
           assertEquals(records.size() * 2, spillableMap.size());
@@ -221,20 +210,18 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
           position = 0L;
           for (T record : records) {
             String recordKey = readerContext.getRecordKey(record, avroSchema);
-            Pair<Option<T>, Map<String, Object>> keyBased = 
spillableMap.get(recordKey);
+            BufferedRecord<T> keyBased = spillableMap.get(recordKey);
             assertNotNull(keyBased);
-            Pair<Option<T>, Map<String, Object>> positionBased = 
spillableMap.get(position++);
+            BufferedRecord<T> positionBased = spillableMap.get(position++);
             assertNotNull(positionBased);
-            assertRecordsEqual(avroSchema, record, keyBased.getLeft().get());
-            assertRecordsEqual(avroSchema, record, 
positionBased.getLeft().get());
-            assertEquals(keyBased.getRight().get(INTERNAL_META_RECORD_KEY), 
recordKey);
-            
assertEquals(positionBased.getRight().get(INTERNAL_META_RECORD_KEY), recordKey);
-            assertEquals(avroSchema, 
readerContext.getSchemaFromMetadata(keyBased.getRight()));
-            assertEquals(dataGen.getPartitionPaths()[0], 
positionBased.getRight().get(INTERNAL_META_PARTITION_PATH));
-            
assertEquals(readerContext.convertValueToEngineType(orderingFieldValue),
-                positionBased.getRight().get(INTERNAL_META_ORDERING_FIELD));
+            assertRecordsEqual(avroSchema, record, keyBased.getRecord());
+            assertRecordsEqual(avroSchema, record, positionBased.getRecord());
+            assertEquals(keyBased.getRecordKey(), recordKey);
+            assertEquals(positionBased.getRecordKey(), recordKey);
+            assertEquals(avroSchema, 
readerContext.getSchemaFromBufferRecord(keyBased));
+            // generate field value is hardcoded as 0 for ordering field: 
timestamp, see HoodieTestDataGenerator#generateRandomValue
+            assertEquals(readerContext.convertValueToEngineType(0L), 
positionBased.getOrderingValue());
           }
-
         }
       }
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index ad4f9aaba4b..ac89ffd0184 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -31,12 +31,12 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.source.ExpressionPredicates.Predicate;
 import org.apache.hudi.storage.HoodieStorage;
@@ -51,6 +51,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.utils.JoinedRowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.DataType;
@@ -75,7 +76,6 @@ import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIEL
 public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> {
   private final List<Predicate> predicates;
   private final Supplier<InternalSchemaManager> internalSchemaManager;
-  private RowDataSerializer rowDataSerializer;
   private final boolean utcTimezone;
 
   public FlinkRowDataReaderContext(
@@ -148,46 +148,45 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
   }
 
   @Override
-  public HoodieRecord<RowData> constructHoodieRecord(Option<RowData> 
recordOption, Map<String, Object> metadataMap) {
-    HoodieKey hoodieKey = new HoodieKey(
-        (String) metadataMap.get(INTERNAL_META_RECORD_KEY),
-        (String) metadataMap.get(INTERNAL_META_PARTITION_PATH));
-    RowData rowData = recordOption.get();
+  public HoodieRecord<RowData> constructHoodieRecord(BufferedRecord<RowData> 
bufferedRecord) {
+    HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), null);
     // delete record
-    if (recordOption.isEmpty()) {
-      Comparable orderingValue;
-      if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
-        orderingValue = (Comparable) 
metadataMap.get(INTERNAL_META_ORDERING_FIELD);
-      } else {
-        throw new HoodieException("There should be ordering value in 
metadataMap.");
-      }
-      return new HoodieEmptyRecord<>(hoodieKey, HoodieOperation.DELETE, 
orderingValue, HoodieRecord.HoodieRecordType.FLINK);
+    if (bufferedRecord.isDelete()) {
+      return new HoodieEmptyRecord<>(hoodieKey, HoodieOperation.DELETE, 
bufferedRecord.getOrderingValue(), HoodieRecord.HoodieRecordType.FLINK);
     }
-    return new HoodieFlinkRecord(hoodieKey, rowData);
+    RowData rowData = bufferedRecord.getRecord();
+    HoodieOperation operation = 
HoodieOperation.fromValue(rowData.getRowKind().toByteValue());
+    return new HoodieFlinkRecord(hoodieKey, operation, 
bufferedRecord.getOrderingValue(), rowData);
   }
 
   @Override
-  public Comparable getOrderingValue(Option<RowData> recordOption, Map<String, 
Object> metadataMap, Schema schema, Option<String> orderingFieldName) {
-    if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
-      return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
-    }
-    if (!recordOption.isPresent() || orderingFieldName.isEmpty()) {
+  public Comparable getOrderingValue(
+      RowData record,
+      Schema schema,
+      Option<String> orderingFieldName) {
+    if (orderingFieldName.isEmpty()) {
       return DEFAULT_ORDERING_VALUE;
     }
     RowDataAvroQueryContexts.FieldQueryContext context = 
RowDataAvroQueryContexts.fromAvroSchema(schema, 
utcTimezone).getFieldQueryContext(orderingFieldName.get());
-    Comparable finalOrderingVal = (Comparable) 
context.getValAsJava(recordOption.get(), false);
-    metadataMap.put(INTERNAL_META_ORDERING_FIELD, finalOrderingVal);
+    Comparable finalOrderingVal = (Comparable) context.getValAsJava(record, 
false);
     return finalOrderingVal;
   }
 
   @Override
   public RowData seal(RowData rowData) {
-    if (rowDataSerializer == null) {
-      RowType requiredRowType = (RowType) 
RowDataAvroQueryContexts.fromAvroSchema(getSchemaHandler().getRequiredSchema()).getRowType().getLogicalType();
-      rowDataSerializer = new RowDataSerializer(requiredRowType);
+    if (rowData instanceof BinaryRowData) {
+      return ((BinaryRowData) rowData).copy();
+    }
+    return rowData;
+  }
+
+  @Override
+  public RowData toBinaryRow(Schema avroSchema, RowData record) {
+    if (record instanceof BinaryRowData) {
+      return record;
     }
-    // copy is unnecessary if there is no caching in subsequent processing.
-    return rowDataSerializer.copy(rowData);
+    RowDataSerializer rowDataSerializer = 
RowDataAvroQueryContexts.getRowDataSerializer(avroSchema);
+    return rowDataSerializer.toBinaryRow(record);
   }
 
   @Override
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index b37c7d910dd..8abf5134ad6 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieEmptyRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -222,13 +223,15 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
   }
 
   @Override
-  public HoodieRecord<ArrayWritable> 
constructHoodieRecord(Option<ArrayWritable> recordOption, Map<String, Object> 
metadataMap) {
-    if (!recordOption.isPresent()) {
-      return new HoodieEmptyRecord<>(new HoodieKey((String) 
metadataMap.get(INTERNAL_META_RECORD_KEY), (String) 
metadataMap.get(INTERNAL_META_PARTITION_PATH)), 
HoodieRecord.HoodieRecordType.HIVE);
+  public HoodieRecord<ArrayWritable> 
constructHoodieRecord(BufferedRecord<ArrayWritable> bufferedRecord) {
+    if (bufferedRecord.isDelete()) {
+      return new HoodieEmptyRecord<>(
+          new HoodieKey(bufferedRecord.getRecordKey(), null),
+          HoodieRecord.HoodieRecordType.HIVE);
     }
-    Schema schema = getSchemaFromMetadata(metadataMap);
-    ArrayWritable writable = recordOption.get();
-    return new HoodieHiveRecord(new HoodieKey((String) 
metadataMap.get(INTERNAL_META_RECORD_KEY), (String) 
metadataMap.get(INTERNAL_META_PARTITION_PATH)), writable, schema, 
objectInspectorCache);
+    Schema schema = getSchemaFromBufferRecord(bufferedRecord);
+    ArrayWritable writable = bufferedRecord.getRecord();
+    return new HoodieHiveRecord(new HoodieKey(bufferedRecord.getRecordKey(), 
null), writable, schema, objectInspectorCache);
   }
 
   @Override
@@ -236,6 +239,11 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
     return new ArrayWritable(Writable.class, Arrays.copyOf(record.get(), 
record.get().length));
   }
 
+  @Override
+  public ArrayWritable toBinaryRow(Schema schema, ArrayWritable record) {
+    return record;
+  }
+
   @Override
   public ClosableIterator<ArrayWritable> 
mergeBootstrapReaders(ClosableIterator<ArrayWritable> skeletonFileIterator,
                                                                Schema 
skeletonRequiredSchema,
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
index 34e23edc146..c4ef58d689c 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
@@ -102,7 +102,7 @@ public class HoodieHiveRecord extends 
HoodieRecord<ArrayWritable> {
   }
 
   @Override
-  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) 
{
+  public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props) {
     String orderingField = ConfigUtils.getOrderingField(props);
     if (isNullOrEmpty(orderingField)) {
       return DEFAULT_ORDERING_VALUE;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
index 9b987937276..e0a59b563c8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
@@ -62,8 +62,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_ORDERING_FIELD;
-import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.createMetaClient;
@@ -204,9 +202,9 @@ public class TestPositionBasedFileGroupRecordBuffer extends 
TestHoodieFileGroupR
     if (sameBaseInstantTime) {
       // If the log block's base instant time of record positions match the 
base file
       // to merge, the log records are stored based on the position
-      
assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY),
+      assertNotNull(buffer.getLogRecords().get(0L).getRecordKey(),
           "the record key is set up for fallback handling");
-      
assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_ORDERING_FIELD),
+      assertNotNull(buffer.getLogRecords().get(0L).getOrderingValue(),
           "the ordering value is set up for fallback handling");
     } else {
       // If the log block's base instant time of record positions does not 
match the
@@ -222,7 +220,7 @@ public class TestPositionBasedFileGroupRecordBuffer extends 
TestHoodieFileGroupR
     HoodieDeleteBlock deleteBlock = 
getDeleteBlockWithPositions(baseFileInstantTime);
     buffer.processDeleteBlock(deleteBlock);
     assertEquals(50, buffer.getLogRecords().size());
-    
assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY));
+    assertNotNull(buffer.getLogRecords().get(0L).getRecordKey());
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 59f7375565c..b65aa9b70f2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -137,16 +137,6 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
         + "{\"name\": \"col2\", \"type\": \"long\" },"
         + "{ \"name\": \"col3\", \"type\": [\"null\", \"string\"], 
\"default\": null}]}")
     val row = InternalRow("item", 1000L, "blue")
-    val metadataMap = Map(HoodieReaderContext.INTERNAL_META_ORDERING_FIELD -> 
100L)
-    assertEquals(100L, sparkReaderContext.getOrderingValue(
-      HOption.empty(), metadataMap.asJava.asInstanceOf[java.util.Map[String, 
Object]],
-      avroSchema, HOption.of(orderingFieldName)))
-    assertEquals(DEFAULT_ORDERING_VALUE, sparkReaderContext.getOrderingValue(
-      HOption.empty(), Map().asJava.asInstanceOf[java.util.Map[String, 
Object]],
-      avroSchema, HOption.of(orderingFieldName)))
-    assertEquals(DEFAULT_ORDERING_VALUE, sparkReaderContext.getOrderingValue(
-      HOption.of(row), Map().asJava.asInstanceOf[java.util.Map[String, 
Object]],
-      avroSchema, HOption.empty()))
     testGetOrderingValue(sparkReaderContext, row, avroSchema, 
orderingFieldName, 1000L)
     testGetOrderingValue(
       sparkReaderContext, row, avroSchema, "col3", 
UTF8String.fromString("blue"))
@@ -264,11 +254,8 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
                                    avroSchema: Schema,
                                    orderingColumn: String,
                                    expectedOrderingValue: Comparable[_]): Unit 
= {
-    val metadataMap = new util.HashMap[String, Object]()
     assertEquals(expectedOrderingValue, sparkReaderContext.getOrderingValue(
-      HOption.of(row), metadataMap, avroSchema, HOption.of(orderingColumn)))
-    assertEquals(expectedOrderingValue,
-      metadataMap.get(HoodieReaderContext.INTERNAL_META_ORDERING_FIELD))
+      row, avroSchema, HOption.of(orderingColumn)))
   }
 }
 

Reply via email to