This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ec466e187ea [HUDI-8654] Add base file instant time of record positions 
to the log block header (#12594)
ec466e187ea is described below

commit ec466e187ea7006b5bc565a20c70c91ccfacd37d
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Jan 29 12:00:22 2025 -0800

    [HUDI-8654] Add base file instant time of record positions to the log block 
header (#12594)
---
 .../cli/commands/TestHoodieLogFileCommand.java     |  4 +-
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 20 ------
 .../timeline/versioning/v1/TimelineArchiverV1.java |  2 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     | 51 +++++++++-----
 .../metadata/HoodieBackedTableMetadataWriter.java  |  2 +-
 .../hudi/metadata/HoodieMetadataWriteUtils.java    |  3 +-
 .../utils/TestLegacyArchivedMetaEntryReader.java   |  2 +-
 .../hudi/testutils/HoodieWriteableTestTable.java   |  2 +-
 .../testutils/HoodieFlinkWriteableTestTable.java   |  2 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    |  4 --
 .../hudi/common/config/HoodieReaderConfig.java     |  2 +-
 .../table/log/block/HoodieAvroDataBlock.java       |  5 +-
 .../common/table/log/block/HoodieCDCDataBlock.java |  2 +-
 .../common/table/log/block/HoodieDataBlock.java    | 22 +-----
 .../common/table/log/block/HoodieDeleteBlock.java  | 18 +----
 .../table/log/block/HoodieHFileDataBlock.java      |  2 +-
 .../common/table/log/block/HoodieLogBlock.java     | 60 +++++++++++++++-
 .../table/log/block/HoodieParquetDataBlock.java    |  3 +-
 .../common/table/read/HoodieFileGroupReader.java   | 10 +--
 .../HoodiePositionBasedFileGroupRecordBuffer.java  | 29 ++++++--
 .../testutils/reader/HoodieFileSliceTestUtils.java | 31 +++++----
 .../hudi/common/util/TestSerializationUtils.java   |  2 +-
 .../common/functional/TestHoodieLogFormat.java     | 79 ++++++++++++++++------
 .../TestHoodieLogFormatAppendFailure.java          |  2 +-
 .../table/log/block/TestHoodieDeleteBlock.java     |  2 +-
 .../common/testutils/HoodieCommonTestHarness.java  |  4 +-
 .../hudi/hadoop/testutils/InputFormatTestUtil.java |  6 +-
 ...stHoodiePositionBasedFileGroupRecordBuffer.java | 48 +++++++++----
 ...dieSparkMergeOnReadTableInsertUpdateDelete.java |  2 +-
 .../hudi/functional/TestMORDataSourceStorage.scala | 61 +++++++++++++----
 .../apache/hudi/hive/testutils/HiveTestUtil.java   |  4 +-
 .../TestHoodieMetadataTableValidator.java          |  2 +-
 32 files changed, 304 insertions(+), 184 deletions(-)

diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 9fab259df55..f108c46c61b 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -120,7 +120,7 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
       Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
       header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
-      dataBlock = new HoodieAvroDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      dataBlock = new HoodieAvroDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
       writer.appendBlock(dataBlock);
 
       Map<HoodieLogBlock.HeaderMetadataType, String> rollbackHeader = new 
HashMap<>();
@@ -218,7 +218,7 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
       Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
       header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-      HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, false, 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
       writer.appendBlock(dataBlock);
     } finally {
       if (writer != null) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 82d34c5515d..3e28767b3e6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -42,7 +42,6 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.TableServiceType;
-import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -1661,23 +1660,4 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       }
     });
   }
-
-  protected final void maybeDisableWriteRecordPositions(HoodieTableMetaClient 
metaClient) {
-    // Disabling {@link WRITE_RECORD_POSITIONS} in the following two cases for 
correctness
-    // even if the record positions are enabled in MOR:
-    // (1) When there is a pending compaction, the new base files to be 
generated by compaction
-    // is not available during this transaction. Given the log files in MOR 
from a new transaction
-    // after a compaction is scheduled can be attached to the base file 
generated by the compaction
-    // in the latest file slice, the accurate record positions may not be 
derived.
-    // (2) When NBCC is enabled, the compaction can be scheduled while there 
are inflight
-    // deltacommits, and unlike OCC, such an inflight deltacommit updating the 
same file group
-    // under compaction can still be successfully committed.  This can also 
introduce the
-    // correctness problem as (1) that the positions in the log file can be 
inaccurate.
-    if (config.shouldWriteRecordPositions()
-        && config.getTableType() == HoodieTableType.MERGE_ON_READ
-        && (config.getWriteConcurrencyMode() == 
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL
-        || 
!metaClient.getActiveTimeline().filterPendingCompactionTimeline().empty())) {
-      config.setValue(HoodieWriteConfig.WRITE_RECORD_POSITIONS, 
String.valueOf(false));
-    }
-  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
index c34b7ccb9de..594825ad9c1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
@@ -440,7 +440,7 @@ public class TimelineArchiverV1<T extends 
HoodieAvroPayload, I, K, O> implements
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
wrapperSchema.toString());
       final String keyField = 
table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
       List<HoodieRecord> indexRecords = 
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
-      HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, false, 
header, keyField);
+      HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, 
header, keyField);
       writer.appendBlock(block);
       records.clear();
     }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index e1103a8bd4f..107e0b62ef6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieDeltaWriteStat;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -92,11 +93,12 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
   private static final AtomicLong RECORD_COUNTER = new AtomicLong(1);
   private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE = 100;
 
-  private final boolean shouldWriteRecordPositions;
   // Buffer for holding records in memory before they are flushed to disk
   private final List<HoodieRecord> recordList = new ArrayList<>();
   // Buffer for holding records (to be deleted), along with their position in 
log block, in memory before they are flushed to disk
   private final List<Pair<DeleteRecord, Long>> recordsToDeleteWithPositions = 
new ArrayList<>();
+  // Base file instant time of the record positions
+  private final Option<String> baseFileInstantTimeOfPositions;
   // Incoming records to be written to logs.
   protected Iterator<HoodieRecord<T>> recordItr;
   // Writer to log into the file group's latest slice.
@@ -158,9 +160,12 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     this.sizeEstimator = new DefaultSizeEstimator();
     this.statuses = new ArrayList<>();
     this.recordProperties.putAll(config.getProps());
-    this.shouldWriteRecordPositions = config.shouldWriteRecordPositions()
+    boolean shouldWriteRecordPositions = config.shouldWriteRecordPositions()
         // record positions supported only from table version 8
         && 
config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT);
+    this.baseFileInstantTimeOfPositions = shouldWriteRecordPositions
+        ? getBaseFileInstantTimeOfPositions()
+        : Option.empty();
   }
 
   public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
@@ -168,6 +173,11 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     this(config, instantTime, hoodieTable, partitionPath, fileId, null, 
sparkTaskContextSupplier);
   }
 
+  private Option<String> getBaseFileInstantTimeOfPositions() {
+    return hoodieTable.getHoodieView().getLatestBaseFile(partitionPath, fileId)
+        .map(HoodieBaseFile::getCommitTime);
+  }
+
   private void populateWriteStat(HoodieRecord record, HoodieDeltaWriteStat 
deltaWriteStat) {
     HoodieTableVersion tableVersion = hoodieTable.version();
     String prevCommit;
@@ -483,13 +493,16 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
             ? HoodieRecord.RECORD_KEY_METADATA_FIELD
             : 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
 
-        blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList, 
shouldWriteRecordPositions,
-            getUpdatedHeader(header, config), keyField));
+        blocks.add(getDataBlock(config, pickLogDataBlockFormat(), recordList,
+            getUpdatedHeader(header, config, baseFileInstantTimeOfPositions),
+            keyField));
       }
 
       if (appendDeleteBlocks && !recordsToDeleteWithPositions.isEmpty()) {
-        blocks.add(new HoodieDeleteBlock(recordsToDeleteWithPositions, 
shouldWriteRecordPositions,
-            getUpdatedHeader(header, config)));
+        blocks.add(new HoodieDeleteBlock(
+            recordsToDeleteWithPositions,
+            getUpdatedHeader(
+                header, config, baseFileInstantTimeOfPositions)));
       }
 
       if (!blocks.isEmpty()) {
@@ -618,7 +631,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
         LOG.error("Error writing record  " + indexedRecord.get(), e);
       }
     } else {
-      long position = shouldWriteRecordPositions ? record.getCurrentPosition() 
: -1L;
+      long position = baseFileInstantTimeOfPositions.isPresent() ? 
record.getCurrentPosition() : -1L;
       
recordsToDeleteWithPositions.add(Pair.of(DeleteRecord.create(record.getKey(), 
orderingVal), position));
     }
     numberOfRecords++;
@@ -665,7 +678,8 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
   }
 
   private static Map<HeaderMetadataType, String> 
getUpdatedHeader(Map<HeaderMetadataType, String> header,
-                                                                  
HoodieWriteConfig config) {
+                                                                  
HoodieWriteConfig config,
+                                                                  
Option<String> baseInstantTimeForPositions) {
     Map<HeaderMetadataType, String> updatedHeader = new HashMap<>(header);
     if (config.shouldWritePartialUpdates()) {
       // When enabling writing partial updates to the data blocks, the 
"IS_PARTIAL" flag is also
@@ -674,26 +688,31 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
       updatedHeader.put(
           HeaderMetadataType.IS_PARTIAL, Boolean.toString(true));
     }
+    if (baseInstantTimeForPositions.isPresent()) {
+      updatedHeader.put(
+          HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS,
+          baseInstantTimeForPositions.get());
+    }
     return updatedHeader;
   }
 
-  private static HoodieLogBlock getBlock(HoodieWriteConfig writeConfig,
-                                         HoodieLogBlock.HoodieLogBlockType 
logDataBlockFormat,
-                                         List<HoodieRecord> records,
-                                         boolean shouldWriteRecordPositions,
-                                         Map<HeaderMetadataType, String> 
header,
-                                         String keyField) {
+  private static HoodieLogBlock getDataBlock(HoodieWriteConfig writeConfig,
+                                             HoodieLogBlock.HoodieLogBlockType 
logDataBlockFormat,
+                                             List<HoodieRecord> records,
+                                             Map<HeaderMetadataType, String> 
header,
+                                             String keyField) {
     switch (logDataBlockFormat) {
       case AVRO_DATA_BLOCK:
-        return new HoodieAvroDataBlock(records, shouldWriteRecordPositions, 
header, keyField);
+        return new HoodieAvroDataBlock(records, header, keyField);
       case HFILE_DATA_BLOCK:
+        // Not supporting positions in HFile data blocks
+        
header.remove(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
         return new HoodieHFileDataBlock(
             records, header, writeConfig.getHFileCompressionAlgorithm(), new 
StoragePath(writeConfig.getBasePath()),
             
writeConfig.getBooleanOrDefault(HoodieReaderConfig.USE_NATIVE_HFILE_READER));
       case PARQUET_DATA_BLOCK:
         return new HoodieParquetDataBlock(
             records,
-            shouldWriteRecordPositions,
             header,
             keyField,
             writeConfig.getParquetCompressionCodec(),
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 9dafe3bad09..4b7332b5a64 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -948,7 +948,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       try {
         final Map<HeaderMetadataType, String> blockHeader = 
Collections.singletonMap(HeaderMetadataType.INSTANT_TIME, instantTime);
 
-        final HoodieDeleteBlock block = new 
HoodieDeleteBlock(Collections.emptyList(), false, blockHeader);
+        final HoodieDeleteBlock block = new 
HoodieDeleteBlock(Collections.emptyList(), blockHeader);
 
         try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
             
.onParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(), 
partitionName))
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index 414bbc3236e..3abdc44e042 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -165,7 +165,8 @@ public class HoodieMetadataWriteUtils {
         
.withRecordMergeStrategyId(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID)
         .withPayloadConfig(HoodiePayloadConfig.newBuilder()
             
.withPayloadClass(HoodieMetadataPayload.class.getCanonicalName()).build())
-        
.withRecordMergeImplClasses(HoodieAvroRecordMerger.class.getCanonicalName());
+        
.withRecordMergeImplClasses(HoodieAvroRecordMerger.class.getCanonicalName())
+        .withWriteRecordPositionsEnabled(false);
 
     // RecordKey properties are needed for the metadata table records
     final Properties properties = new Properties();
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
index eb7aa2d35d9..929524d84fe 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
@@ -136,7 +136,7 @@ public class TestLegacyArchivedMetaEntryReader {
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
wrapperSchema.toString());
       final String keyField = 
metaClient.getTableConfig().getRecordKeyFieldProp();
       List<HoodieRecord> indexRecords = 
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
-      HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, false, 
header, keyField);
+      HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, 
header, keyField);
       writer.appendBlock(block);
       records.clear();
     }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index c52617e6a6e..c4633df6a9f 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -191,7 +191,7 @@ public class HoodieWriteableTestTable extends 
HoodieMetadataTestTable {
           return null;
         }
       }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
-          false, header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
+          header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
       return Pair.of(partitionPath, logWriter.getLogFile());
     }
   }
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
index 9b7f1ef2fb2..07af039be66 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
@@ -153,7 +153,7 @@ public class HoodieFlinkWriteableTestTable extends 
HoodieWriteableTestTable {
           LOG.warn("Failed to convert record " + r.toString(), e);
           return null;
         }
-      }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), 
false, header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
+      }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
       return Pair.of(partitionPath, logWriter.getLogFile());
     }
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index c0b76892603..38c71f86383 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -127,7 +127,6 @@ public class SparkRDDWriteClient<T> extends
   public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String 
instantTime) {
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
-    maybeDisableWriteRecordPositions(table.getMetaClient());
     table.validateUpsertSchema();
     preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.upsert(context, instantTime, HoodieJavaRDD.of(records));
@@ -142,7 +141,6 @@ public class SparkRDDWriteClient<T> extends
   public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> 
preppedRecords, String instantTime) {
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.UPSERT_PREPPED, 
Option.ofNullable(instantTime));
-    maybeDisableWriteRecordPositions(table.getMetaClient());
     table.validateUpsertSchema();
     preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, 
table.getMetaClient());
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.upsertPrepped(context, instantTime, HoodieJavaRDD.of(preppedRecords));
@@ -234,7 +232,6 @@ public class SparkRDDWriteClient<T> extends
   @Override
   public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, String 
instantTime) {
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE, 
Option.ofNullable(instantTime));
-    maybeDisableWriteRecordPositions(table.getMetaClient());
     preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.delete(context, instantTime, HoodieJavaRDD.of(keys));
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
@@ -244,7 +241,6 @@ public class SparkRDDWriteClient<T> extends
   @Override
   public JavaRDD<WriteStatus> deletePrepped(JavaRDD<HoodieRecord<T>> 
preppedRecord, String instantTime) {
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PREPPED, 
Option.ofNullable(instantTime));
-    maybeDisableWriteRecordPositions(table.getMetaClient());
     preWrite(instantTime, WriteOperationType.DELETE_PREPPED, 
table.getMetaClient());
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.deletePrepped(context,instantTime, HoodieJavaRDD.of(preppedRecord));
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index 5a8eb69ad79..56935fc5d81 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -68,7 +68,7 @@ public class HoodieReaderConfig extends HoodieConfig {
 
   public static final ConfigProperty<Boolean> MERGE_USE_RECORD_POSITIONS = 
ConfigProperty
       .key("hoodie.merge.use.record.positions")
-      .defaultValue(false)
+      .defaultValue(true)
       .markAdvanced()
       .sinceVersion("1.0.0")
       .withDocumentation("Whether to use positions in the block header for 
data blocks containing updates and delete blocks for merging.");
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 478e12eb014..488ffb1ce43 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -88,11 +88,10 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
   }
 
   public HoodieAvroDataBlock(@Nonnull List<HoodieRecord> records,
-                             boolean shouldWriteRecordPositions,
                              @Nonnull Map<HeaderMetadataType, String> header,
                              @Nonnull String keyField
   ) {
-    super(records, shouldWriteRecordPositions, header, new HashMap<>(), 
keyField);
+    super(records, header, new HashMap<>(), keyField);
   }
 
   @Override
@@ -235,7 +234,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
    */
   @Deprecated
   public HoodieAvroDataBlock(List<HoodieRecord> records, Schema schema) {
-    super(records, false, Collections.singletonMap(HeaderMetadataType.SCHEMA, 
schema.toString()), new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    super(records, Collections.singletonMap(HeaderMetadataType.SCHEMA, 
schema.toString()), new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
   }
 
   public static HoodieAvroDataBlock getBlock(byte[] content, Schema 
readerSchema) throws IOException {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
index f32eeb0dfb7..44140b5b6af 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
@@ -49,7 +49,7 @@ public class HoodieCDCDataBlock extends HoodieAvroDataBlock {
   public HoodieCDCDataBlock(List<HoodieRecord> records,
                             Map<HeaderMetadataType, String> header,
                             String keyField) {
-    super(records, false, header, keyField);
+    super(records, header, keyField);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index b413d82fd2f..6f48cb27a74 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -42,7 +42,6 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hudi.common.model.HoodieRecordLocation.isPositionValid;
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
@@ -70,7 +69,6 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
   private final boolean enablePointLookups;
 
   protected Schema readerSchema;
-  protected final boolean shouldWriteRecordPositions;
 
   //  Map of string schema to parsed schema.
   private static ConcurrentHashMap<String, Schema> schemaMap = new 
ConcurrentHashMap<>();
@@ -79,31 +77,15 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
    * NOTE: This ctor is used on the write-path (ie when records ought to be 
written into the log)
    */
   public HoodieDataBlock(List<HoodieRecord> records,
-                         boolean shouldWriteRecordPositions,
                          Map<HeaderMetadataType, String> header,
                          Map<FooterMetadataType, String> footer,
                          String keyFieldName) {
     super(header, footer, Option.empty(), Option.empty(), null, false);
-    if (shouldWriteRecordPositions) {
-      records.sort((o1, o2) -> {
-        long v1 = o1.getCurrentPosition();
-        long v2 = o2.getCurrentPosition();
-        return Long.compare(v1, v2);
-      });
-      if (isPositionValid(records.get(0).getCurrentPosition())) {
-        addRecordPositionsToHeader(
-            
records.stream().map(HoodieRecord::getCurrentPosition).collect(Collectors.toSet()),
-            records.size());
-      } else {
-        LOG.warn("There are records without valid positions. "
-            + "Skip writing record positions to the data block header.");
-      }
-    }
+    addRecordPositionsIfRequired(records, HoodieRecord::getCurrentPosition);
     this.records = Option.of(records);
     this.keyFieldName = keyFieldName;
     // If no reader-schema has been provided assume writer-schema as one
     this.readerSchema = getWriterSchema(super.getLogBlockHeader());
-    this.shouldWriteRecordPositions = shouldWriteRecordPositions;
     this.enablePointLookups = false;
   }
 
@@ -120,8 +102,6 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
                             String keyFieldName,
                             boolean enablePointLookups) {
     super(headers, footer, blockContentLocation, content, inputStreamSupplier, 
readBlockLazily);
-    // Setting `shouldWriteRecordPositions` to false as this constructor is 
only used by the reader
-    this.shouldWriteRecordPositions = false;
     this.records = Option.empty();
     this.keyFieldName = keyFieldName;
     this.readerSchema = containsPartialUpdates()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index 3f5bfe51c78..c71db423856 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -56,7 +56,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
 import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
-import static 
org.apache.hudi.common.model.HoodieRecordLocation.isPositionValid;
 
 /**
  * Delete block contains a list of keys to be deleted from scanning the blocks 
so far.
@@ -76,24 +75,9 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
   private DeleteRecord[] recordsToDelete;
 
   public HoodieDeleteBlock(List<Pair<DeleteRecord, Long>> recordsToDelete,
-                           boolean shouldWriteRecordPositions,
                            Map<HeaderMetadataType, String> header) {
     this(Option.empty(), null, false, Option.empty(), header, new HashMap<>());
-    if (shouldWriteRecordPositions && !recordsToDelete.isEmpty()) {
-      recordsToDelete.sort((o1, o2) -> {
-        long v1 = o1.getRight();
-        long v2 = o2.getRight();
-        return Long.compare(v1, v2);
-      });
-      if (isPositionValid(recordsToDelete.get(0).getRight())) {
-        addRecordPositionsToHeader(
-            
recordsToDelete.stream().map(Pair::getRight).collect(Collectors.toSet()),
-            recordsToDelete.size());
-      } else {
-        LOG.warn("There are delete records without valid positions. "
-            + "Skip writing record positions to the delete block header.");
-      }
-    }
+    addRecordPositionsIfRequired(recordsToDelete, Pair::getRight);
     this.recordsToDelete = 
recordsToDelete.stream().map(Pair::getLeft).toArray(DeleteRecord[]::new);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 406bc0c6a03..4d1267701ba 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -89,7 +89,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
                               String compressionCodec,
                               StoragePath pathForReader,
                               boolean useNativeHFileReader) {
-    super(records, false, header, new HashMap<>(), 
HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME);
+    super(records, header, new HashMap<>(), 
HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME);
     this.compressionCodec = Option.of(compressionCodec);
     this.pathForReader = pathForReader;
     this.hFileReaderConfig = getHFileReaderConfig(useNativeHFileReader);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 7d7da384d5d..04c7827b4e4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -42,11 +42,14 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.model.HoodieRecordLocation.isPositionValid;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
@@ -145,7 +148,16 @@ public abstract class HoodieLogBlock {
     return 
LogReaderUtils.decodeRecordPositionsHeader(logBlockHeader.get(HeaderMetadataType.RECORD_POSITIONS));
   }
 
-  protected void addRecordPositionsToHeader(Set<Long> positionSet, int 
numRecords) {
+  /**
+   * @return base file instant time of the record positions if the record 
positions are enabled
+   * in the log block; {@code null} otherwise.
+   */
+  public String getBaseFileInstantTimeOfPositions() {
+    return 
logBlockHeader.get(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
+  }
+
+  protected void addRecordPositionsToHeader(Set<Long> positionSet,
+                                            int numRecords) {
     if (positionSet.size() == numRecords) {
       try {
         logBlockHeader.put(HeaderMetadataType.RECORD_POSITIONS, 
LogReaderUtils.encodePositions(positionSet));
@@ -159,6 +171,17 @@ public abstract class HoodieLogBlock {
     }
   }
 
+  protected boolean containsBaseFileInstantTimeOfPositions() {
+    return logBlockHeader.containsKey(
+        HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
+  }
+
+  protected void removeBaseFileInstantTimeOfPositions() {
+    LOG.warn("There are records without valid positions. "
+        + "Skip writing record positions to the block header.");
+    
logBlockHeader.remove(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
+  }
+
   /**
    * Type of the log block WARNING: This enum is serialized as the ordinal. 
Only add new enums at the end.
    */
@@ -208,7 +231,8 @@ public abstract class HoodieLogBlock {
     COMPACTED_BLOCK_TIMES(HoodieTableVersion.FIVE),
     RECORD_POSITIONS(HoodieTableVersion.SIX),
     BLOCK_IDENTIFIER(HoodieTableVersion.SIX),
-    IS_PARTIAL(HoodieTableVersion.EIGHT);
+    IS_PARTIAL(HoodieTableVersion.EIGHT),
+    BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS(HoodieTableVersion.EIGHT);
 
     @SuppressWarnings("unused")
     private final HoodieTableVersion earliestTableVersion;
@@ -321,6 +345,38 @@ public abstract class HoodieLogBlock {
     return Option.of(content);
   }
 
+  /**
+   * Adds the record positions if the base file instant time of the positions 
exists
+   * in the log header and the record positions are all valid.
+   *
+   * @param records         records with valid or invalid positions
+   * @param getPositionFunc function to get the position from the record
+   * @param <T>             type of record
+   */
+  protected <T> void addRecordPositionsIfRequired(List<T> records,
+                                                  Function<T, Long> 
getPositionFunc) {
+    if (containsBaseFileInstantTimeOfPositions()) {
+      if (!isPositionValid(getPositionFunc.apply(records.get(0)))) {
+        // Short circuit in case all records do not have valid positions,
+        // e.g., BUCKET index cannot identify the record position with low 
overhead
+        removeBaseFileInstantTimeOfPositions();
+        return;
+      }
+      records.sort((o1, o2) -> {
+        long v1 = getPositionFunc.apply(o1);
+        long v2 = getPositionFunc.apply(o2);
+        return Long.compare(v1, v2);
+      });
+      if (isPositionValid(getPositionFunc.apply(records.get(0)))) {
+        addRecordPositionsToHeader(
+            records.stream().map(getPositionFunc).collect(Collectors.toSet()),
+            records.size());
+      } else {
+        removeBaseFileInstantTimeOfPositions();
+      }
+    }
+  }
+
   /**
    * When lazyReading of blocks is turned on, inflate the content of a log 
block from disk.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 45ba608b3e5..8d9b1221c54 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -69,14 +69,13 @@ public class HoodieParquetDataBlock extends HoodieDataBlock 
{
   }
 
   public HoodieParquetDataBlock(List<HoodieRecord> records,
-                                boolean shouldWriteRecordPositions,
                                 Map<HeaderMetadataType, String> header,
                                 String keyField,
                                 String compressionCodecName,
                                 double expectedCompressionRatio,
                                 boolean useDictionaryEncoding
   ) {
-    super(records, shouldWriteRecordPositions, header, new HashMap<>(), 
keyField);
+    super(records, header, new HashMap<>(), keyField);
 
     this.compressionCodecName = Option.of(compressionCodecName);
     this.expectedCompressionRatio = Option.of(expectedCompressionRatio);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index e8cc9133f4d..bf9653efbf3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -122,8 +122,8 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     this.outputConverter = 
readerContext.getSchemaHandler().getOutputConverter();
     this.readStats = new HoodieReadStats();
     this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
-        tableConfig.getRecordMergeMode(), props, this.logFiles.isEmpty(), 
isSkipMerge,
-        shouldUseRecordPosition, readStats);
+        tableConfig.getRecordMergeMode(), props, hoodieBaseFileOption, 
this.logFiles.isEmpty(),
+        isSkipMerge, shouldUseRecordPosition, readStats);
   }
 
   /**
@@ -133,6 +133,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
                                                              
HoodieTableMetaClient hoodieTableMetaClient,
                                                              RecordMergeMode 
recordMergeMode,
                                                              TypedProperties 
props,
+                                                             
Option<HoodieBaseFile> baseFileOption,
                                                              boolean 
hasNoLogFiles,
                                                              boolean 
isSkipMerge,
                                                              boolean 
shouldUseRecordPosition,
@@ -142,9 +143,10 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     } else if (isSkipMerge) {
       return new HoodieUnmergedFileGroupRecordBuffer<>(
           readerContext, hoodieTableMetaClient, recordMergeMode, 
Option.empty(), Option.empty(), props, readStats);
-    } else if (shouldUseRecordPosition) {
+    } else if (shouldUseRecordPosition && baseFileOption.isPresent()) {
       return new HoodiePositionBasedFileGroupRecordBuffer<>(
-          readerContext, hoodieTableMetaClient, recordMergeMode, 
Option.empty(), Option.empty(), props, readStats);
+          readerContext, hoodieTableMetaClient, recordMergeMode, 
Option.empty(),
+          Option.empty(), baseFileOption.get().getCommitTime(), props, 
readStats);
     } else {
       return new HoodieKeyBasedFileGroupRecordBuffer<>(
           readerContext, hoodieTableMetaClient, recordMergeMode, 
Option.empty(), Option.empty(), props, readStats);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index a96982229da..9adc02f94c6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieKeyException;
@@ -64,6 +65,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieKeyBasedF
 
   private static final String ROW_INDEX_COLUMN_NAME = "row_index";
   public static final String ROW_INDEX_TEMPORARY_COLUMN_NAME = 
"_tmp_metadata_" + ROW_INDEX_COLUMN_NAME;
+  protected final String baseFileInstantTime;
   private long nextRecordPosition = 0L;
   private boolean needToDoHybridStrategy = false;
 
@@ -72,9 +74,11 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieKeyBasedF
                                                   RecordMergeMode 
recordMergeMode,
                                                   Option<String> 
partitionNameOverrideOpt,
                                                   Option<String[]> 
partitionPathFieldOpt,
+                                                  String baseFileInstantTime,
                                                   TypedProperties props,
                                                   HoodieReadStats readStats) {
     super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
+    this.baseFileInstantTime = baseFileInstantTime;
   }
 
   @Override
@@ -89,7 +93,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieKeyBasedF
       return;
     }
     // Extract positions from data block.
-    List<Long> recordPositions = extractRecordPositions(dataBlock);
+    List<Long> recordPositions = extractRecordPositions(dataBlock, 
baseFileInstantTime);
     if (recordPositions == null) {
       LOG.warn("Falling back to key based merge for Read");
       fallbackToKeyBasedBuffer();
@@ -166,7 +170,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieKeyBasedF
       return;
     }
 
-    List<Long> recordPositions = extractRecordPositions(deleteBlock);
+    List<Long> recordPositions = extractRecordPositions(deleteBlock, 
baseFileInstantTime);
     if (recordPositions == null) {
       LOG.warn("Falling back to key based merge for Read");
       fallbackToKeyBasedBuffer();
@@ -281,15 +285,26 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieKeyBasedF
   }
 
   /**
-   * Extract the record positions from a log block header.
+   * Extracts valid record positions from a log block header.
    *
-   * @param logBlock
-   * @return
-   * @throws IOException
+   * @param logBlock            {@link HoodieLogBlock} instance of the log 
block
+   * @param baseFileInstantTime base file instant time for the file group to 
read
+   *
+   * @return valid record positions
+   * @throws IOException upon I/O errors
    */
-  protected static List<Long> extractRecordPositions(HoodieLogBlock logBlock) 
throws IOException {
+  protected static List<Long> extractRecordPositions(HoodieLogBlock logBlock,
+                                                     String 
baseFileInstantTime) throws IOException {
     List<Long> blockPositions = new ArrayList<>();
 
+    String blockBaseFileInstantTime = 
logBlock.getBaseFileInstantTimeOfPositions();
+    if (StringUtils.isNullOrEmpty(blockBaseFileInstantTime) || 
!baseFileInstantTime.equals(blockBaseFileInstantTime)) {
+      LOG.debug("The record positions cannot be used because the base file 
instant time "
+              + "is either missing or different from the base file to merge. "
+              + "Instant time in the header: {}, base file instant time of the 
file group: {}.",
+          blockBaseFileInstantTime, baseFileInstantTime);
+      return null;
+    }
     Roaring64NavigableMap positions = logBlock.getRecordPositions();
     if (positions == null || positions.isEmpty()) {
       LOG.warn("No record position info is found when attempt to do position 
based merge.");
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 6995a2f0c25..594cbc1b8ca 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -70,6 +70,7 @@ import java.util.stream.IntStream;
 
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
+import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.DELETE_BLOCK;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK;
 import static 
org.apache.hudi.common.testutils.FileCreateUtilsLegacy.baseFileName;
@@ -168,7 +169,6 @@ public class HoodieFileSliceTestUtils {
       List<IndexedRecord> records,
       Map<HoodieLogBlock.HeaderMetadataType, String> header,
       StoragePath logFilePath,
-      boolean writePositions,
       Map<String, Long> keyToPositionMap
   ) {
     return createDataBlock(
@@ -176,19 +176,17 @@ public class HoodieFileSliceTestUtils {
         records.stream().map(r -> new HoodieAvroIndexedRecord(r, new 
HoodieRecordLocation("", "", keyToPositionMap.get(r.get(RECORD_KEY_INDEX)))))
             .collect(Collectors.toList()),
         header,
-        logFilePath,
-        writePositions);
+        logFilePath);
   }
 
   private static HoodieDataBlock createDataBlock(
       HoodieLogBlock.HoodieLogBlockType dataBlockType,
       List<HoodieRecord> records,
       Map<HoodieLogBlock.HeaderMetadataType, String> header,
-      StoragePath pathForReader,
-      boolean writePositions
-  ) {
+      StoragePath pathForReader) {
     switch (dataBlockType) {
       case CDC_DATA_BLOCK:
+        header.remove(BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
         return new HoodieCDCDataBlock(
             records,
             header,
@@ -196,10 +194,10 @@ public class HoodieFileSliceTestUtils {
       case AVRO_DATA_BLOCK:
         return new HoodieAvroDataBlock(
             records,
-            writePositions,
             header,
             HoodieRecord.RECORD_KEY_METADATA_FIELD);
       case HFILE_DATA_BLOCK:
+        header.remove(BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
         return new HoodieHFileDataBlock(
             records,
             header,
@@ -209,7 +207,6 @@ public class HoodieFileSliceTestUtils {
       case PARQUET_DATA_BLOCK:
         return new HoodieParquetDataBlock(
             records,
-            writePositions,
             header,
             HoodieRecord.RECORD_KEY_METADATA_FIELD,
             PARQUET_COMPRESSION_CODEC_NAME.defaultValue(),
@@ -226,7 +223,6 @@ public class HoodieFileSliceTestUtils {
       Map<HoodieLogBlock.HeaderMetadataType, String> header,
       Schema schema,
       Properties props,
-      boolean writePositions,
       Map<String, Long> keyToPositionMap
   ) {
     List<HoodieRecord> hoodieRecords = records.stream()
@@ -241,9 +237,7 @@ public class HoodieFileSliceTestUtils {
             r -> Pair.of(DeleteRecord.create(
                 r.getKey(), r.getOrderingValue(schema, props)), 
r.getCurrentLocation().getPosition()))
             .collect(Collectors.toList()),
-        writePositions,
-        header
-    );
+        header);
   }
 
   public static HoodieBaseFile createBaseFile(
@@ -291,6 +285,7 @@ public class HoodieFileSliceTestUtils {
       List<IndexedRecord> records,
       Schema schema,
       String fileId,
+      String baseFileInstantTime,
       String logInstantTime,
       int version,
       HoodieLogBlock.HoodieLogBlockType blockType,
@@ -308,14 +303,19 @@ public class HoodieFileSliceTestUtils {
       Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
       header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
logInstantTime);
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+      if (writePositions) {
+        header.put(
+            BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS,
+            baseFileInstantTime);
+      }
 
       if (blockType != DELETE_BLOCK) {
         HoodieDataBlock dataBlock = getDataBlock(
-            blockType, records, header, new StoragePath(logFilePath), 
writePositions, keyToPositionMap);
+            blockType, records, header, new StoragePath(logFilePath), 
keyToPositionMap);
         writer.appendBlock(dataBlock);
       } else {
-        HoodieDeleteBlock deleteBlock = getDeleteBlock(
-            records, header, schema, PROPERTIES, writePositions, 
keyToPositionMap);
+        HoodieDeleteBlock deleteBlock =
+            getDeleteBlock(records, header, schema, PROPERTIES, 
keyToPositionMap);
         writer.appendBlock(deleteBlock);
       }
     }
@@ -373,6 +373,7 @@ public class HoodieFileSliceTestUtils {
           records,
           schema,
           fileId,
+          baseFilePlan.getInstantTime(),
           logFilePlan.getInstantTime(),
           i,
           blockType,
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
index a4d7cb29023..4e45bc184aa 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
@@ -74,7 +74,7 @@ public class TestSerializationUtils {
     DeleteRecord deleteRecord = DeleteRecord.create(new HoodieKey("key", 
"partition"));
     List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
     deleteRecordList.add(Pair.of(deleteRecord, -1L));
-    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, Collections.emptyMap());
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
Collections.emptyMap());
 
     byte[] firstBytes = SerializationUtils.serialize(deleteBlock);
     byte[] secondBytes = SerializationUtils.serialize(deleteBlock);
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 5ac7ee5ffe3..098911b7abc 100755
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -26,8 +26,10 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -42,7 +44,6 @@ import 
org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
 import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.table.log.LogReaderUtils;
 import org.apache.hudi.common.table.log.TestLogReaderUtils;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
@@ -87,6 +88,7 @@ import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -109,8 +111,11 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.model.HoodieRecordLocation.INVALID_POSITION;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.NULL_SCHEMA;
 import static org.apache.hudi.common.testutils.HoodieTestUtils.getJavaVersion;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.shouldUseExternalHdfs;
 import static org.apache.hudi.common.testutils.HoodieTestUtils.useExternalHdfs;
@@ -1358,7 +1363,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     for (DeleteRecord dr : deletedRecords) {
       deleteRecordList.add(Pair.of(dr, -1L));
     }
-    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, header);
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
header);
     writer.appendBlock(deleteBlock);
 
     List<String> allLogFiles =
@@ -1499,7 +1504,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       deleteRecordList.add(Pair.of(
           DeleteRecord.create(deletedKey.getRecordKey(), 
deletedKey.getPartitionPath()), -1L));
     }
-    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, header);
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
header);
     writer.appendBlock(deleteBlock);
 
     List<String> allLogFiles =
@@ -1522,7 +1527,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       deleteRecordList.add(Pair.of(
           DeleteRecord.create(deletedKey.getRecordKey(), 
deletedKey.getPartitionPath()), -1L));
     }
-    deleteBlock = new HoodieDeleteBlock(deleteRecordList, false, 
deleteBlockHeader);
+    deleteBlock = new HoodieDeleteBlock(deleteRecordList, deleteBlockHeader);
     writer.appendBlock(deleteBlock);
 
     FileCreateUtilsLegacy.createDeltaCommit(basePath, "102", storage);
@@ -1614,7 +1619,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     for (DeleteRecord dr : deleteRecords1) {
       deleteRecordList.add(Pair.of(dr, -1L));
     }
-    HoodieDeleteBlock deleteBlock1 = new HoodieDeleteBlock(deleteRecordList, 
false, header);
+    HoodieDeleteBlock deleteBlock1 = new HoodieDeleteBlock(deleteRecordList, 
header);
     writer.appendBlock(deleteBlock1);
 
     // Delete another 10 keys with -1 as orderingVal.
@@ -1626,7 +1631,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
             ((GenericRecord) 
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), -1), -1L))
         .collect(Collectors.toList());
 
-    HoodieDeleteBlock deleteBlock2 = new HoodieDeleteBlock(deleteRecordList, 
false, header);
+    HoodieDeleteBlock deleteBlock2 = new HoodieDeleteBlock(deleteRecordList, 
header);
     writer.appendBlock(deleteBlock2);
 
     // Delete another 10 keys with +1 as orderingVal.
@@ -1641,7 +1646,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     for (DeleteRecord dr : deletedRecords3) {
       deleteRecordList.add(Pair.of(dr, -1L));
     }
-    HoodieDeleteBlock deleteBlock3 = new HoodieDeleteBlock(deleteRecordList, 
false, header);
+    HoodieDeleteBlock deleteBlock3 = new HoodieDeleteBlock(deleteRecordList, 
header);
     writer.appendBlock(deleteBlock3);
 
     List<String> allLogFiles =
@@ -1740,7 +1745,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     for (DeleteRecord dr : deleteRecords) {
       deleteRecordList.add(Pair.of(dr, -1L));
     }
-    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, header);
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
header);
     writer.appendBlock(deleteBlock);
 
     FileCreateUtilsLegacy.createDeltaCommit(basePath, "100", storage);
@@ -1802,7 +1807,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     for (DeleteRecord dr : deleteRecords) {
       deleteRecordList.add(Pair.of(dr, -1L));
     }
-    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, header);
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
header);
     writer.appendBlock(deleteBlock);
 
     FileCreateUtilsLegacy.createDeltaCommit(basePath, "100", storage);
@@ -1895,7 +1900,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     for (DeleteRecord dr : deleteRecords) {
       deleteRecordList.add(Pair.of(dr, -1L));
     }
-    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, header);
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
header);
     writer.appendBlock(deleteBlock);
 
     FileCreateUtilsLegacy.createDeltaCommit(basePath, "100", storage);
@@ -1980,7 +1985,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     for (DeleteRecord dr : deletedRecords) {
       deleteRecordList.add(Pair.of(dr, -1L));
     }
-    writer.appendBlock(new HoodieDeleteBlock(deleteRecordList, false, header));
+    writer.appendBlock(new HoodieDeleteBlock(deleteRecordList, header));
 
     copyOfRecords2.addAll(copyOfRecords1);
     List<String> originalKeys =
@@ -2790,19 +2795,51 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = {false, true})
-  public void testGetRecordPositions(boolean addRecordPositionsHeader) throws 
IOException {
+  @CsvSource(value = {"false,false,false", "false,false,true",
+      "true,false,false", "true,false,true", "true,true,false", 
"true,true,true"})
+  public void testGetRecordPositions(boolean recordWithPositions,
+                                     boolean allValidPositions,
+                                     boolean 
addBaseFileInstantTimeOfPositions) throws IOException {
     Map<HeaderMetadataType, String> header = new HashMap<>();
-    Set<Long> positions = new HashSet<>();
-    if (addRecordPositionsHeader) {
-      positions = TestLogReaderUtils.generatePositions();
-      String content = LogReaderUtils.encodePositions(positions);
-      header.put(HeaderMetadataType.RECORD_POSITIONS, content);
+    List<Long> positions = new ArrayList<>();
+    if (recordWithPositions) {
+      positions.addAll(TestLogReaderUtils.generatePositions());
+      if (!allValidPositions) {
+        positions.add(INVALID_POSITION);
+        positions.set(positions.size() / 2, INVALID_POSITION);
+      }
     }
-    HoodieLogBlock logBlock = new HoodieDeleteBlock(Collections.emptyList(), 
addRecordPositionsHeader, header);
-    if (addRecordPositionsHeader) {
-      TestLogReaderUtils.assertPositionEquals(positions, 
logBlock.getRecordPositions());
+
+    List<Pair<DeleteRecord, Long>> deleteRecordList = positions.isEmpty()
+        ? IntStream.range(0, 10).boxed()
+        .map(i -> Pair.of(DeleteRecord.create("key" + i, "partition"), 
INVALID_POSITION))
+        .collect(Collectors.toList())
+        : IntStream.range(0, positions.size()).boxed()
+        .map(i -> Pair.of(DeleteRecord.create("key" + i, "partition"), 
positions.get(i)))
+        .collect(Collectors.toList());
+    List<HoodieRecord> recordList = positions.isEmpty()
+        ? IntStream.range(0, 10).boxed()
+        .map(i -> new HoodieAvroRecord(new HoodieKey("key" + i, "partition"), 
null, HoodieOperation.INSERT,
+            new HoodieRecordLocation("001", "file1", INVALID_POSITION), null))
+        .collect(Collectors.toList())
+        : IntStream.range(0, positions.size()).boxed()
+        .map(i -> new HoodieAvroRecord(new HoodieKey("key" + i, "partition"), 
null, HoodieOperation.INSERT,
+            new HoodieRecordLocation("001", "file1", positions.get(i)), null))
+        .collect(Collectors.toList());
+
+    header.put(HeaderMetadataType.SCHEMA, NULL_SCHEMA);
+    if (addBaseFileInstantTimeOfPositions) {
+      
header.put(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS, 
"001");
     }
+    HoodieLogBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
header);
+    HoodieLogBlock dataBlock = new HoodieAvroDataBlock(recordList, header, 
"key");
+    boolean hasPositions = (recordWithPositions && allValidPositions && 
addBaseFileInstantTimeOfPositions);
+    String expectedInstantTime = hasPositions ? "001" : null;
+    Set<Long> expectedPositions = hasPositions ? new HashSet<>(positions) : 
Collections.emptySet();
+    assertEquals(expectedInstantTime, 
deleteBlock.getBaseFileInstantTimeOfPositions());
+    TestLogReaderUtils.assertPositionEquals(expectedPositions, 
deleteBlock.getRecordPositions());
+    assertEquals(expectedInstantTime, 
dataBlock.getBaseFileInstantTimeOfPositions());
+    TestLogReaderUtils.assertPositionEquals(expectedPositions, 
dataBlock.getRecordPositions());
   }
 
   private static Stream<Arguments> testArguments() {
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
index 2175867f8ba..2fd6f4dbf5c 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
@@ -114,7 +114,7 @@ public class TestHoodieLogFormatAppendFailure {
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
     HoodieAvroDataBlock dataBlock =
-        new HoodieAvroDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+        new HoodieAvroDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
 
     Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
         
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits")
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
index d55139fe06e..ba513e391cd 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
@@ -121,7 +121,7 @@ public class TestHoodieDeleteBlock {
     for (DeleteRecord dr : deleteRecords) {
       deleteRecordList.add(Pair.of(dr, -1L));
     }
-    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
false, new HashMap<>());
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList, 
new HashMap<>());
     byte[] contentBytes = 
deleteBlock.getContentBytes(HoodieTestUtils.getDefaultStorage());
     HoodieDeleteBlock deserializeDeleteBlock = new HoodieDeleteBlock(
         Option.of(contentBytes), null, true, Option.empty(), new HashMap<>(), 
new HashMap<>());
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index eb7a04c83e2..590a7818c91 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -372,11 +372,11 @@ public class HoodieCommonTestHarness {
       case CDC_DATA_BLOCK:
         return new HoodieCDCDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
       case AVRO_DATA_BLOCK:
-        return new HoodieAvroDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+        return new HoodieAvroDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
       case HFILE_DATA_BLOCK:
         return new HoodieHFileDataBlock(records, header, 
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader, 
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
       case PARQUET_DATA_BLOCK:
-        return new HoodieParquetDataBlock(records, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, 
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
+        return new HoodieParquetDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, 
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
       default:
         throw new RuntimeException("Unknown data block type " + dataBlockType);
     }
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 972a57c6a4f..c766076d66a 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -423,11 +423,11 @@ public class InputFormatTestUtil {
       dataBlock = new HoodieHFileDataBlock(
           hoodieRecords, header, 
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), writer.getLogFile().getPath(), 
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
     } else if (logBlockType == 
HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
-      dataBlock = new HoodieParquetDataBlock(hoodieRecords, false, header,
+      dataBlock = new HoodieParquetDataBlock(hoodieRecords, header,
           HoodieRecord.RECORD_KEY_METADATA_FIELD, 
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
     } else {
-      dataBlock = new HoodieAvroDataBlock(hoodieRecords, false, header,
-          HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      dataBlock = new HoodieAvroDataBlock(
+          hoodieRecords, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
     }
     writer.appendBlock(dataBlock);
     return writer;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index e27c96d08b3..3c80ef83919 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -51,6 +51,8 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
@@ -62,6 +64,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
+import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.createMetaClient;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -75,7 +78,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
   private String partitionPath;
   private HoodieReadStats readStats;
 
-  public void prepareBuffer(RecordMergeMode mergeMode) throws Exception {
+  public void prepareBuffer(RecordMergeMode mergeMode, String 
baseFileInstantTime) throws Exception {
     Map<String, String> writeConfigs = new HashMap<>();
     writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
"parquet");
     writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
@@ -139,14 +142,19 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
         mergeMode,
         partitionNameOpt,
         partitionFields,
+        baseFileInstantTime,
         props,
         readStats);
   }
 
-  public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader() {
+  public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader(boolean 
shouldWriteRecordPositions,
+                                                                  String 
baseFileInstantTime) {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
avroSchema.toString());
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    if (shouldWriteRecordPositions) {
+      header.put(BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS, 
baseFileInstantTime);
+    }
     return header;
   }
 
@@ -161,7 +169,8 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
     return deletedRecords;
   }
 
-  public HoodieDeleteBlock getDeleteBlockWithPositions() throws IOException, 
URISyntaxException {
+  public HoodieDeleteBlock getDeleteBlockWithPositions(String 
baseFileInstantTime)
+      throws IOException, URISyntaxException {
     List<DeleteRecord> deletedRecords = getDeleteRecords();
     List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
 
@@ -169,7 +178,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
     for (DeleteRecord dr : deletedRecords) {
       deleteRecordList.add(Pair.of(dr, position++));
     }
-    return new HoodieDeleteBlock(deleteRecordList, true, getHeader());
+    return new HoodieDeleteBlock(deleteRecordList, getHeader(true, 
baseFileInstantTime));
   }
 
   public HoodieDeleteBlock getDeleteBlockWithoutPositions() throws 
IOException, URISyntaxException {
@@ -179,23 +188,34 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
     for (DeleteRecord dr : deletedRecords) {
       deleteRecordList.add(Pair.of(dr, -1L));
     }
-    return new HoodieDeleteBlock(deleteRecordList, true, getHeader());
+    return new HoodieDeleteBlock(deleteRecordList, getHeader(false, ""));
   }
 
-  @Test
-  public void testProcessDeleteBlockWithPositions() throws Exception {
-    prepareBuffer(RecordMergeMode.COMMIT_TIME_ORDERING);
-    HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions();
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testProcessDeleteBlockWithPositions(boolean sameBaseInstantTime) 
throws Exception {
+    String baseFileInstantTime = "090";
+    prepareBuffer(RecordMergeMode.COMMIT_TIME_ORDERING, baseFileInstantTime);
+    HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions(
+        sameBaseInstantTime ? baseFileInstantTime : baseFileInstantTime + "1");
     buffer.processDeleteBlock(deleteBlock);
     assertEquals(50, buffer.getLogRecords().size());
-    // With record positions, we do not need the record keys.
-    
assertNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY));
+    if (sameBaseInstantTime) {
+      // If the log block's base instant time of record positions match the 
base file
+      // to merge, the log records are stored based on the position
+      
assertNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY));
+    } else {
+      // If the log block's base instant time of record positions does not 
match the
+      // base file to merge, the log records are stored based on the record key
+      assertNull(buffer.getLogRecords().get(0L));
+    }
   }
 
   @Test
   public void testProcessDeleteBlockWithCustomMerger() throws Exception {
-    prepareBuffer(RecordMergeMode.CUSTOM);
-    HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions();
+    String baseFileInstantTime = "090";
+    prepareBuffer(RecordMergeMode.CUSTOM, baseFileInstantTime);
+    HoodieDeleteBlock deleteBlock = 
getDeleteBlockWithPositions(baseFileInstantTime);
     buffer.processDeleteBlock(deleteBlock);
     assertEquals(50, buffer.getLogRecords().size());
     
assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY));
@@ -203,7 +223,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer 
extends TestHoodieFile
 
   @Test
   public void testProcessDeleteBlockWithoutPositions() throws Exception {
-    prepareBuffer(RecordMergeMode.COMMIT_TIME_ORDERING);
+    prepareBuffer(RecordMergeMode.COMMIT_TIME_ORDERING, "090");
     HoodieDeleteBlock deleteBlock = getDeleteBlockWithoutPositions();
     buffer.processDeleteBlock(deleteBlock);
     assertEquals(50, buffer.getLogRecords().size());
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index 6d1d5120ff8..05abd8f59dd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -454,7 +454,7 @@ public class 
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
     List<HoodieRecord> hoodieRecords = 
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
-    return new HoodieAvroDataBlock(hoodieRecords, false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    return new HoodieAvroDataBlock(hoodieRecords, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
   }
 
   private String generateFakeWriteToken(String correctWriteToken) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 9fb8f455f09..6d2ec1cbd81 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieLogFile, 
HoodieTableType, WriteConcurrencyMode}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.table.log.HoodieLogFileReader
+import org.apache.hudi.common.table.view.FileSystemViewManager
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
HoodieTestUtils}
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.common.util.StringUtils
@@ -43,7 +44,7 @@ import org.apache.spark.sql.functions.{col, lit}
 import org.junit.jupiter.api.{Tag, Test}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
+import org.junit.jupiter.params.provider.CsvSource
 
 import scala.collection.JavaConverters._
 
@@ -191,15 +192,16 @@ class TestMORDataSourceStorage extends 
SparkClientFunctionalTestHarness {
   }
 
   @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testAutoDisablingRecordPositionsUnderPendingCompaction(enableNBCC: 
Boolean): Unit = {
+  @CsvSource(value = Array("false,false", "true,true", "true,false"))
+  def 
testAutoDisablingRecordPositionsUnderPendingCompaction(writeRecordPosition: 
Boolean,
+                                                             enableNBCC: 
Boolean): Unit = {
     val options = Map(
       "hoodie.insert.shuffle.parallelism" -> "4",
       "hoodie.upsert.shuffle.parallelism" -> "4",
       "hoodie.bulkinsert.shuffle.parallelism" -> "2",
       "hoodie.delete.shuffle.parallelism" -> "1",
       "hoodie.merge.small.file.group.candidates.limit" -> "0",
-      HoodieWriteConfig.WRITE_RECORD_POSITIONS.key -> "true",
+      HoodieWriteConfig.WRITE_RECORD_POSITIONS.key -> 
writeRecordPosition.toString,
       DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
       DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
classOf[NonpartitionedKeyGenerator].getName,
       DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
@@ -241,6 +243,7 @@ class TestMORDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key, 
"true").load(basePath).count())
 
     val metaClient = HoodieTestUtils.createMetaClient(storage.getConf, 
basePath)
+    var logFileList = List[HoodieLogFile]()
     // Upsert
     for (i <- 1 to 3) {
       // Generate some deletes so that if the record positions are still 
enabled during pending
@@ -272,12 +275,11 @@ class TestMORDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       assertEquals(1,
         
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants())
       assertEquals(i + 1, 
metaClient.getActiveTimeline.getDeltaCommitTimeline.countInstants())
-      // The deltacommit from the first round should write record positions in 
the log files
-      // since it happens before the pending compaction.
-      // The deltacommit from the second and third round should not write 
record positions in the
-      // log files since it happens after the pending compaction
-      validateRecordPositionsInLogFiles(
-        metaClient, shouldContainRecordPosition = !enableNBCC && i == 1)
+      // The deltacommit from all three rounds should write record positions 
in the log files
+      // and the base file instant time of the record positions should match 
the latest
+      // base file before compaction happens
+      logFileList = validateRecordPositionsInLogFiles(
+        metaClient, writeRecordPosition && !enableNBCC)
     }
 
     for (i <- 4 to 6) {
@@ -308,17 +310,28 @@ class TestMORDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       
assertTrue(metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().empty())
       assertEquals(1, 
metaClient.getActiveTimeline.getCommitAndReplaceTimeline.countInstants())
       assertEquals(i + 1, 
metaClient.getActiveTimeline.getDeltaCommitTimeline.countInstants())
-      // The deltacommit from the forth round should not write record 
positions in the log files
-      // since it happens before the compaction is executed.
+      // The deltacommit from the forth round should write record positions in 
the log files
+      // but the base file instant time of the record positions should not 
match the latest
+      // base file, since the compaction happens afterwards.
+      if (i == 4) {
+        // Also revalidate the log files generate from the third round as the 
base instant
+        // time should not match the the latest base file after compaction
+        validateRecordPositionsInLogFiles(
+          metaClient, shouldContainRecordPosition = writeRecordPosition && 
!enableNBCC,
+          logFileList, shouldBaseFileInstantTimeMatch = false)
+      }
       // The deltacommit from the fifth and sixth round should write record 
positions in the log
-      // files since it happens after the completed compaction
+      // files and the base file instant time of the record positions should 
match the latest
+      // base file generated by the compaction.
       validateRecordPositionsInLogFiles(
-        metaClient, shouldContainRecordPosition = !enableNBCC && i != 4)
+        metaClient, shouldContainRecordPosition = writeRecordPosition && 
!enableNBCC,
+        shouldBaseFileInstantTimeMatch = i != 4)
     }
   }
 
   def validateRecordPositionsInLogFiles(metaClient: HoodieTableMetaClient,
-                                        shouldContainRecordPosition: Boolean): 
Unit = {
+                                        shouldContainRecordPosition: Boolean,
+                                        shouldBaseFileInstantTimeMatch: 
Boolean = true): List[HoodieLogFile] = {
     val instant = 
metaClient.getActiveTimeline.getDeltaCommitTimeline.lastInstant().get()
     val commitMetadata = metaClient.getCommitMetadataSerDe.deserialize(
       instant, metaClient.getActiveTimeline.getInstantDetails(instant).get,
@@ -329,7 +342,18 @@ class TestMORDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       .map(e => new HoodieLogFile(new StoragePath(e)))
       .toList
     assertFalse(logFileList.isEmpty)
+    validateRecordPositionsInLogFiles(
+      metaClient, shouldContainRecordPosition, logFileList, 
shouldBaseFileInstantTimeMatch)
+    logFileList
+  }
+
+  def validateRecordPositionsInLogFiles(metaClient: HoodieTableMetaClient,
+                                        shouldContainRecordPosition: Boolean,
+                                        logFileList: List[HoodieLogFile],
+                                        shouldBaseFileInstantTimeMatch: 
Boolean): Unit = {
     val schema = new TableSchemaResolver(metaClient).getTableAvroSchema
+    val fsv = FileSystemViewManager.createInMemoryFileSystemView(
+      context(), metaClient, HoodieMetadataConfig.newBuilder().build())
     logFileList.foreach(filename => {
       val logFormatReader = new HoodieLogFileReader(metaClient.getStorage, 
filename, schema, 81920)
       var numBlocks = 0
@@ -337,6 +361,13 @@ class TestMORDataSourceStorage extends 
SparkClientFunctionalTestHarness {
         val logBlock = logFormatReader.next()
         val recordPositions = logBlock.getRecordPositions
         assertEquals(shouldContainRecordPosition, !recordPositions.isEmpty)
+        if (shouldContainRecordPosition) {
+          val baseFile = fsv.getLatestBaseFile("", filename.getFileId)
+          assertTrue(baseFile.isPresent)
+          assertEquals(
+            shouldBaseFileInstantTimeMatch,
+            
baseFile.get().getCommitTime.equals(logBlock.getBaseFileInstantTimeOfPositions))
+        }
         numBlocks += 1
       }
       logFormatReader.close()
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index 15032990731..526ead7e02d 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -717,7 +717,7 @@ public class HiveTestUtil {
     Map<HeaderMetadataType, String> header = new HashMap<>(2);
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
dataFile.getCommitTime());
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, false, 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
     logWriter.appendBlock(dataBlock);
     logWriter.close();
     return logWriter.getLogFile();
@@ -735,7 +735,7 @@ public class HiveTestUtil {
     Map<HeaderMetadataType, String> header = new HashMap<>(2);
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
dataFile.getCommitTime());
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, false, 
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
     logWriter.appendBlock(dataBlock);
     logWriter.close();
     return logWriter.getLogFile();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 793c3bb0422..7e424968a4c 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -853,7 +853,7 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
         header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
instantTime);
         header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
         writer.appendBlock(new HoodieAvroDataBlock(
-            Collections.emptyList(), false, header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD));
+            Collections.emptyList(), header, 
HoodieRecord.RECORD_KEY_METADATA_FIELD));
       } else {
         header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
instantTime);
         header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, 
baseInstantTime);


Reply via email to