vinothchandar commented on code in PR #12321:
URL: https://github.com/apache/hudi/pull/12321#discussion_r1855506943


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1044,29 +1042,28 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List<Hood
 
   /**
    * Update from {@code HoodieCommitMetadata}.
-   *
    * @param commitMetadata {@code HoodieCommitMetadata}
    * @param instantTime    Timestamp at which the commit was performed
    */
   @Override
-  public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, 
HoodieData<WriteStatus> writeStatus, String instantTime) {
+  public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
     processAndCommit(instantTime, () -> {
       Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
           HoodieTableMetadataUtil.convertMetadataToRecords(
               engineContext, dataWriteConfig, commitMetadata, instantTime, 
dataMetaClient,
               enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
               dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
-              dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), 
dataWriteConfig.getMetadataConfig());
+              dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex(), 
dataWriteConfig.getWritesFileIdEncoding(),
+              dataWriteConfig.getMetadataConfig());
 
       // Updates for record index are created by parsing the WriteStatus which 
is a hudi-client object. Hence, we cannot yet move this code
       // to the HoodieTableMetadataUtil class in hudi-common.
       if (dataWriteConfig.isRecordIndexEnabled()) {
-        HoodieData<HoodieRecord> updatesFromWriteStatuses = 
getRecordIndexUpserts(writeStatus);
-        HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpserts(updatesFromWriteStatuses, commitMetadata);
-        partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), 
updatesFromWriteStatuses.union(additionalUpdates));
+        HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpserts(partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()),
 commitMetadata);
+        partitionToRecordMap.put(RECORD_INDEX.getPartitionPath(), 
partitionToRecordMap.get(MetadataPartitionType.RECORD_INDEX.getPartitionPath()).union(additionalUpdates));
       }
       updateFunctionalIndexIfPresent(commitMetadata, instantTime, 
partitionToRecordMap);
-      updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, 
writeStatus);
+      updateSecondaryIndexIfPresent(commitMetadata, partitionToRecordMap, 
instantTime);

Review Comment:
   nts: we seem to be independently writing these indexes - one for each 
partition. As long as there is 1 delta commit, this should be fine. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1141,28 +1140,22 @@ private void 
updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata,
         .forEach(partition -> {
           HoodieData<HoodieRecord> secondaryIndexRecords;
           try {
-            secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, 
partition, writeStatus);
+            secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata, 
partition, instantTime);
           } catch (Exception e) {
             throw new HoodieMetadataException("Failed to get secondary index 
updates for partition " + partition, e);
           }
           partitionToRecordMap.put(partition, secondaryIndexRecords);
         });
   }
 
-  private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, HoodieData<WriteStatus> writeStatus) throws Exception {
+  private HoodieData<HoodieRecord> 
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, String instantTime) throws Exception {
     List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = 
getPartitionFilePairs(commitMetadata);
     // Build a list of keys that need to be removed. A 'delete' record will be 
emitted into the respective FileGroup of
     // the secondary index partition for each of these keys. For a commit 
which is deleting/updating a lot of records, this
     // operation is going to be expensive (in CPU, memory and IO)
-    List<String> keysToRemove = new ArrayList<>();
-    writeStatus.collectAsList().forEach(status -> {

Review Comment:
   wow. this would have OOM-ed anyway. since we are collecting per record 
delegates on the driver.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java:
##########
@@ -42,16 +43,19 @@
 public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner {
 
   private final LogRecordScannerCallback callback;
+  private final DeletionCallback deletionCallback;
 
   private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String 
basePath, List<String> logFilePaths, Schema readerSchema,
                                          String latestInstantTime, boolean 
reverseReader, int bufferSize,
-                                         LogRecordScannerCallback callback, 
Option<InstantRange> instantRange, InternalSchema internalSchema,
+                                         LogRecordScannerCallback callback, 
DeletionCallback deletionCallback,
+                                         Option<InstantRange> instantRange, 
InternalSchema internalSchema,
                                          boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger,
                                          Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
     super(storage, basePath, logFilePaths, readerSchema, latestInstantTime, 
reverseReader, bufferSize, instantRange,
         false, true, Option.empty(), internalSchema, Option.empty(), 
enableOptimizedLogBlocksScan, recordMerger,
          hoodieTableMetaClientOption);
     this.callback = callback;
+    this.deletionCallback = deletionCallback;

Review Comment:
   scanner is read-only. why do we need a deletion callback here



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
         });
   }
 
+  static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      int 
writesFileIdEncoding,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFilesWithDeletes = 
allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            // handle base files
+            if 
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+              return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
+            } else {
+              // for logs, we only need to process log files containing deletes
+              if (writeStat.getNumDeletes() > 0) {
+                StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+                Set<String> deletedRecordKeys = 
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+                    finalWriterSchemaOpt, maxBufferSize, instantTime);
+                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+              }
+              // ignore log file data blocks.
+              return new ArrayList<HoodieRecord>().iterator();
+            }
+          });
+
+      // there are chances that same record key from data table has 2 entries 
(1 delete from older partition and 1 insert to newer partition)
+      // lets do reduce by key to ignore the deleted entry.
+      return reduceByKeys(recordIndexRecords, parallelism);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate RLI records for metadata 
table", e);
+    }
+  }
+
+  /**
+   * There are chances that same record key from data table has 2 entries (1 
delete from older partition and 1 insert to newer partition)
+   * So, this method performs reduce by key to ignore the deleted entry.
+   * @param recordIndexRecords hoodie records after rli index lookup.
+   * @param parallelism parallelism to use.
+   * @return
+   */
+  private static HoodieData<HoodieRecord> 
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {

Review Comment:
   UT



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
         });
   }
 
+  static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      int 
writesFileIdEncoding,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFilesWithDeletes = 
allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            // handle base files
+            if 
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+              return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
+            } else {
+              // for logs, we only need to process log files containing deletes
+              if (writeStat.getNumDeletes() > 0) {
+                StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+                Set<String> deletedRecordKeys = 
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+                    finalWriterSchemaOpt, maxBufferSize, instantTime);
+                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+              }
+              // ignore log file data blocks.
+              return new ArrayList<HoodieRecord>().iterator();
+            }
+          });
+
+      // there are chances that same record key from data table has 2 entries 
(1 delete from older partition and 1 insert to newer partition)
+      // lets do reduce by key to ignore the deleted entry.
+      return reduceByKeys(recordIndexRecords, parallelism);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate RLI records for metadata 
table", e);
+    }
+  }
+
+  /**
+   * There are chances that same record key from data table has 2 entries (1 
delete from older partition and 1 insert to newer partition)
+   * So, this method performs reduce by key to ignore the deleted entry.
+   * @param recordIndexRecords hoodie records after rli index lookup.
+   * @param parallelism parallelism to use.
+   * @return
+   */
+  private static HoodieData<HoodieRecord> 
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+    return recordIndexRecords.mapToPair(
+            (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>) 
t -> Pair.of(t.getKey(), t))
+        .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, 
HoodieRecord>) (record1, record2) -> {
+          boolean isRecord1Deleted = record1.getData() instanceof 
EmptyHoodieRecordPayload;
+          boolean isRecord2Deleted = record2.getData() instanceof 
EmptyHoodieRecordPayload;
+          if (isRecord1Deleted && !isRecord2Deleted) {
+            return record2;
+          } else if (!isRecord1Deleted && isRecord2Deleted) {
+            return record1;
+          } else {
+            throw new HoodieIOException("Two HoodieRecord updates to RLI is 
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+                + record1.getData().toString() + ", record 2 : " + 
record2.getData().toString());
+          }
+        }, parallelism).values();

Review Comment:
   nts: check there is no copying of all records into memory or sth



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.FileFormatUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static java.util.stream.Collectors.toList;
+
+public class BaseFileRecordParsingUtils {
+
+  /**
+   * Generates RLI Metadata records for base files.
+   * If base file is a added to a new file group, all record keys are treated 
as inserts.
+   * If a base file is added to an existing file group, we read previous base 
file in addition to the latest base file of interest. Find deleted records and 
generate RLI Metadata records
+   * for the same in addition to new insert records.
+   * @param basePath base path of the table.
+   * @param writeStat {@link HoodieWriteStat} of interest.
+   * @param writesFileIdEncoding fileID encoding for the table.
+   * @param instantTime instant time of interest.
+   * @param storage instance of {@link HoodieStorage}.
+   * @return Iterator of {@link HoodieRecord}s for RLI Metadata partition.
+   * @throws IOException
+   */
+  public static Iterator<HoodieRecord> 
generateRLIMetadataHoodieRecordsForBaseFile(String basePath,
+                                                                               
    HoodieWriteStat writeStat,
+                                                                               
    Integer writesFileIdEncoding,
+                                                                               
    String instantTime,
+                                                                               
    HoodieStorage storage) throws IOException {
+    String partition = writeStat.getPartitionPath();
+    String latestFileName = FSUtils.getFileNameFromPath(writeStat.getPath());
+    String previousFileName = writeStat.getPrevBaseFile();
+    String fileId = FSUtils.getFileId(latestFileName);
+    Set<String> recordKeysFromLatestBaseFile = 
getRecordKeysFromBaseFile(storage, basePath, partition, latestFileName);
+    if (previousFileName == null) {
+      return recordKeysFromLatestBaseFile.stream().map(recordKey -> 
(HoodieRecord)HoodieMetadataPayload.createRecordIndexUpdate(recordKey, 
partition, fileId,
+          instantTime, writesFileIdEncoding)).collect(toList()).iterator();
+    } else {
+      // read from previous base file and find difference to also generate 
delete records.
+      // we will return new inserts and deletes from this code block
+      Set<String> recordKeysFromPreviousBaseFile = 
getRecordKeysFromBaseFile(storage, basePath, partition, previousFileName);
+      List<HoodieRecord> toReturn = recordKeysFromPreviousBaseFile.stream()
+          .filter(recordKey -> {
+            // deleted record
+            return !recordKeysFromLatestBaseFile.contains(recordKey);
+          }).map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList());
+
+      toReturn.addAll(recordKeysFromLatestBaseFile.stream()
+          .filter(recordKey -> {
+            // new inserts
+            return !recordKeysFromPreviousBaseFile.contains(recordKey);
+          }).map(recordKey ->
+              HoodieMetadataPayload.createRecordIndexUpdate(recordKey, 
partition, fileId,
+                  instantTime, writesFileIdEncoding)).collect(toList()));
+      return toReturn.iterator();
+    }
+  }
+
+  public static List<String> getRecordKeysDeletedOrUpdated(String basePath,

Review Comment:
   method with same name in HoodieTableMetadataUtil  - 
getRecordKeysDeletedOrUpdated 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
         });
   }
 
+  static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      int 
writesFileIdEncoding,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFilesWithDeletes = 
allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            // handle base files
+            if 
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+              return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
+            } else {
+              // for logs, we only need to process log files containing deletes
+              if (writeStat.getNumDeletes() > 0) {
+                StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+                Set<String> deletedRecordKeys = 
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+                    finalWriterSchemaOpt, maxBufferSize, instantTime);
+                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+              }
+              // ignore log file data blocks.
+              return new ArrayList<HoodieRecord>().iterator();
+            }
+          });
+
+      // there are chances that same record key from data table has 2 entries 
(1 delete from older partition and 1 insert to newer partition)
+      // lets do reduce by key to ignore the deleted entry.
+      return reduceByKeys(recordIndexRecords, parallelism);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate RLI records for metadata 
table", e);
+    }
+  }
+
+  /**
+   * There are chances that same record key from data table has 2 entries (1 
delete from older partition and 1 insert to newer partition)
+   * So, this method performs reduce by key to ignore the deleted entry.
+   * @param recordIndexRecords hoodie records after rli index lookup.
+   * @param parallelism parallelism to use.
+   * @return
+   */
+  private static HoodieData<HoodieRecord> 
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+    return recordIndexRecords.mapToPair(
+            (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>) 
t -> Pair.of(t.getKey(), t))
+        .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, 
HoodieRecord>) (record1, record2) -> {
+          boolean isRecord1Deleted = record1.getData() instanceof 
EmptyHoodieRecordPayload;
+          boolean isRecord2Deleted = record2.getData() instanceof 
EmptyHoodieRecordPayload;
+          if (isRecord1Deleted && !isRecord2Deleted) {
+            return record2;
+          } else if (!isRecord1Deleted && isRecord2Deleted) {
+            return record1;
+          } else {
+            throw new HoodieIOException("Two HoodieRecord updates to RLI is 
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+                + record1.getData().toString() + ", record 2 : " + 
record2.getData().toString());
+          }
+        }, parallelism).values();
+  }
+
+  @VisibleForTesting
+  public static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext 
engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName);
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();

Review Comment:
   same. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
         });
   }
 
+  static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      int 
writesFileIdEncoding,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFilesWithDeletes = 
allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            // handle base files
+            if 
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {

Review Comment:
   we cannot assume base is parquet. need to check all supported file types



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala:
##########
@@ -1080,12 +1080,11 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
       )
       verifyQueryPredicate(hudiOpts, "not_record_key_col")
 
-      // update the secondary key column by insert.
-      spark.sql(s"insert into $tableName values (5, 'row2',  'efg', 'p2')")
+      // update the secondary key column by update.
+      spark.sql(s"update $tableName set not_record_key_col = 'efg' where 
record_key_col = 'row2'")
       confirmLastCommitType(ActionType.replacecommit)
       // validate the secondary index records themselves
       checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
-        Seq(s"cde${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}row2", false),

Review Comment:
   this is not present now, due to handling of deletes differently?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -370,9 +381,9 @@ public void setPath(StoragePath basePath, StoragePath path) 
{
   @Override
   public String toString() {
     return "HoodieWriteStat{fileId='" + fileId + '\'' + ", path='" + path + 
'\'' + ", prevCommit='" + prevCommit
-        + '\'' + ", numWrites=" + numWrites + ", numDeletes=" + numDeletes + 
", numUpdateWrites=" + numUpdateWrites
-        + ", totalWriteBytes=" + totalWriteBytes + ", totalWriteErrors=" + 
totalWriteErrors + ", tempPath='" + tempPath
-        + '\'' + ", cdcStats='" + JsonUtils.toString(cdcStats)
+        + '\'' + ", prevBaseFile=" + prevBaseFile + '\'' + ", numWrites=" + 
numWrites + ", numDeletes=" + numDeletes

Review Comment:
   will reformat / clean this up.. Looks too busy



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.FileFormatUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static java.util.stream.Collectors.toList;
+
+public class BaseFileRecordParsingUtils {
+
+  /**
+   * Generates RLI Metadata records for base files.
+   * If base file is a added to a new file group, all record keys are treated 
as inserts.
+   * If a base file is added to an existing file group, we read previous base 
file in addition to the latest base file of interest. Find deleted records and 
generate RLI Metadata records
+   * for the same in addition to new insert records.
+   * @param basePath base path of the table.
+   * @param writeStat {@link HoodieWriteStat} of interest.
+   * @param writesFileIdEncoding fileID encoding for the table.
+   * @param instantTime instant time of interest.
+   * @param storage instance of {@link HoodieStorage}.
+   * @return Iterator of {@link HoodieRecord}s for RLI Metadata partition.
+   * @throws IOException
+   */
+  public static Iterator<HoodieRecord> 
generateRLIMetadataHoodieRecordsForBaseFile(String basePath,

Review Comment:
   we should get into the habit of creating clean classes.. for core 
functionality. else, these "helper" methods just grow and grow.. 
   
   Will clean this up



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1667,51 +1660,6 @@ private void 
fetchOutofSyncFilesRecordsFromMetadataTable(Map<String, DirectoryIn
     }
   }
 
-  /**
-   * Return records that represent upserts to the record index due to write 
operation on the dataset.
-   *
-   * @param writeStatuses {@code WriteStatus} from the write operation
-   */
-  private HoodieData<HoodieRecord> 
getRecordIndexUpserts(HoodieData<WriteStatus> writeStatuses) {

Review Comment:
   need to confirm this was all just moved somewhere else



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java:
##########
@@ -95,6 +103,14 @@ public interface LogRecordScannerCallback {
     void apply(HoodieRecord<?> record) throws Exception;
   }
 
+  /**
+   * A callback for log record scanner to consume deleted HoodieKeys.
+   */
+  @FunctionalInterface
+  public interface DeletionCallback {

Review Comment:
   ok. this is a new one, same name as LogFormatWriter, where its used for log 
files being deleted. 
   
   rename: `RecordDeletionCallback`



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java:
##########
@@ -173,6 +190,11 @@ public Builder 
withLogRecordScannerCallback(LogRecordScannerCallback callback) {
       return this;
     }
 
+    public Builder withLogRecordScannerCallbackForDeletedKeys(DeletionCallback 
deletionCallback) {

Review Comment:
   name



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -171,6 +171,7 @@ private void init(String fileId, String partitionPath, 
HoodieBaseFile baseFileTo
     try {
       String latestValidFilePath = baseFileToMerge.getFileName();
       writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
+      writeStatus.getStat().setPrevBaseFile(latestValidFilePath);

Review Comment:
   why just store the base file. for e.g if merge handle was called during 
compaction, dont we need the entire slice. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.FileFormatUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static java.util.stream.Collectors.toList;
+
+public class BaseFileRecordParsingUtils {
+
+  /**
+   * Generates RLI Metadata records for base files.
+   * If base file is a added to a new file group, all record keys are treated 
as inserts.
+   * If a base file is added to an existing file group, we read previous base 
file in addition to the latest base file of interest. Find deleted records and 
generate RLI Metadata records
+   * for the same in addition to new insert records.
+   * @param basePath base path of the table.
+   * @param writeStat {@link HoodieWriteStat} of interest.
+   * @param writesFileIdEncoding fileID encoding for the table.
+   * @param instantTime instant time of interest.
+   * @param storage instance of {@link HoodieStorage}.
+   * @return Iterator of {@link HoodieRecord}s for RLI Metadata partition.
+   * @throws IOException
+   */
+  public static Iterator<HoodieRecord> 
generateRLIMetadataHoodieRecordsForBaseFile(String basePath,
+                                                                               
    HoodieWriteStat writeStat,
+                                                                               
    Integer writesFileIdEncoding,

Review Comment:
   there is no enum for this?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
         });
   }
 
+  static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      int 
writesFileIdEncoding,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFilesWithDeletes = 
allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            // handle base files
+            if 
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+              return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
+            } else {
+              // for logs, we only need to process log files containing deletes
+              if (writeStat.getNumDeletes() > 0) {
+                StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+                Set<String> deletedRecordKeys = 
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+                    finalWriterSchemaOpt, maxBufferSize, instantTime);
+                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+              }
+              // ignore log file data blocks.
+              return new ArrayList<HoodieRecord>().iterator();
+            }
+          });
+
+      // there are chances that same record key from data table has 2 entries 
(1 delete from older partition and 1 insert to newer partition)
+      // lets do reduce by key to ignore the deleted entry.
+      return reduceByKeys(recordIndexRecords, parallelism);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate RLI records for metadata 
table", e);
+    }
+  }
+
+  /**
+   * There are chances that same record key from data table has 2 entries (1 
delete from older partition and 1 insert to newer partition)
+   * So, this method performs reduce by key to ignore the deleted entry.
+   * @param recordIndexRecords hoodie records after rli index lookup.
+   * @param parallelism parallelism to use.
+   * @return
+   */
+  private static HoodieData<HoodieRecord> 
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+    return recordIndexRecords.mapToPair(
+            (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>) 
t -> Pair.of(t.getKey(), t))
+        .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, 
HoodieRecord>) (record1, record2) -> {
+          boolean isRecord1Deleted = record1.getData() instanceof 
EmptyHoodieRecordPayload;
+          boolean isRecord2Deleted = record2.getData() instanceof 
EmptyHoodieRecordPayload;
+          if (isRecord1Deleted && !isRecord2Deleted) {
+            return record2;
+          } else if (!isRecord1Deleted && isRecord2Deleted) {
+            return record1;
+          } else {
+            throw new HoodieIOException("Two HoodieRecord updates to RLI is 
seen for same record key " + record2.getRecordKey() + ", record 1 : "

Review Comment:
   what if there are two delete records.. lets say no precombine during  write? 
think we need to handle false, false.. and true, true differently.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
         });
   }
 
+  static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      int 
writesFileIdEncoding,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFilesWithDeletes = 
allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            // handle base files
+            if 
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+              return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
+            } else {
+              // for logs, we only need to process log files containing deletes
+              if (writeStat.getNumDeletes() > 0) {
+                StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+                Set<String> deletedRecordKeys = 
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+                    finalWriterSchemaOpt, maxBufferSize, instantTime);
+                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+              }
+              // ignore log file data blocks.
+              return new ArrayList<HoodieRecord>().iterator();
+            }
+          });
+
+      // there are chances that same record key from data table has 2 entries 
(1 delete from older partition and 1 insert to newer partition)
+      // lets do reduce by key to ignore the deleted entry.
+      return reduceByKeys(recordIndexRecords, parallelism);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate RLI records for metadata 
table", e);
+    }
+  }
+
+  /**
+   * There are chances that same record key from data table has 2 entries (1 
delete from older partition and 1 insert to newer partition)
+   * So, this method performs reduce by key to ignore the deleted entry.
+   * @param recordIndexRecords hoodie records after rli index lookup.
+   * @param parallelism parallelism to use.
+   * @return
+   */
+  private static HoodieData<HoodieRecord> 
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+    return recordIndexRecords.mapToPair(
+            (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>) 
t -> Pair.of(t.getKey(), t))
+        .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, 
HoodieRecord>) (record1, record2) -> {
+          boolean isRecord1Deleted = record1.getData() instanceof 
EmptyHoodieRecordPayload;
+          boolean isRecord2Deleted = record2.getData() instanceof 
EmptyHoodieRecordPayload;
+          if (isRecord1Deleted && !isRecord2Deleted) {
+            return record2;
+          } else if (!isRecord1Deleted && isRecord2Deleted) {
+            return record1;
+          } else {
+            throw new HoodieIOException("Two HoodieRecord updates to RLI is 
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+                + record1.getData().toString() + ", record 2 : " + 
record2.getData().toString());
+          }
+        }, parallelism).values();
+  }
+
+  @VisibleForTesting
+  public static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext 
engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName);
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFiles) {
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      return engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            // handle base files
+            if 
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+              return 
BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat, 
storage).iterator();
+            } else {
+              // for logs, every entry is either an update or a delete
+              StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+              return getRecordKeys(fullFilePath.toString(), 
dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, 
instantTime).iterator();
+            }
+          }).collectAsList();

Review Comment:
   this is collecting sth record level. will need to check for OOM



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
         });
   }
 
+  static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      int 
writesFileIdEncoding,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFilesWithDeletes = 
allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            // handle base files
+            if 
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+              return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
+            } else {
+              // for logs, we only need to process log files containing deletes
+              if (writeStat.getNumDeletes() > 0) {
+                StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+                Set<String> deletedRecordKeys = 
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,

Review Comment:
   we need UT `getDeletedRecordKeys`



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
         });
   }
 
+  static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      int 
writesFileIdEncoding,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFilesWithDeletes = 
allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            // handle base files
+            if 
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+              return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
+            } else {
+              // for logs, we only need to process log files containing deletes
+              if (writeStat.getNumDeletes() > 0) {
+                StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+                Set<String> deletedRecordKeys = 
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+                    finalWriterSchemaOpt, maxBufferSize, instantTime);
+                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+              }
+              // ignore log file data blocks.
+              return new ArrayList<HoodieRecord>().iterator();
+            }
+          });
+
+      // there are chances that same record key from data table has 2 entries 
(1 delete from older partition and 1 insert to newer partition)

Review Comment:
   good catch



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1311,6 +1447,53 @@ public static 
List<HoodieColumnRangeMetadata<Comparable>> getLogFileColumnRangeM
     return Collections.emptyList();
   }
 
+  @VisibleForTesting
+  public static Set<String> getDeletedRecordKeys(String filePath, 
HoodieTableMetaClient datasetMetaClient,
+                                                 Option<Schema> 
writerSchemaOpt, int maxBufferSize,
+                                                 String latestCommitTimestamp) 
throws IOException {
+    if (writerSchemaOpt.isPresent()) {
+      // read log file records without merging
+      Set<String> deletedKeys = new HashSet<>();
+      HoodieUnMergedLogRecordScanner scanner = 
HoodieUnMergedLogRecordScanner.newBuilder()
+          .withStorage(datasetMetaClient.getStorage())
+          .withBasePath(datasetMetaClient.getBasePath())
+          .withLogFilePaths(Collections.singletonList(filePath))
+          .withBufferSize(maxBufferSize)
+          .withLatestInstantTime(latestCommitTimestamp)
+          .withReaderSchema(writerSchemaOpt.get())
+          .withTableMetaClient(datasetMetaClient)
+          .withLogRecordScannerCallbackForDeletedKeys(deletedKey -> 
deletedKeys.add(deletedKey.getRecordKey()))
+          .build();
+      scanner.scan();
+      return deletedKeys;
+    }
+    return Collections.emptySet();
+  }
+
+  @VisibleForTesting
+  public static Set<String> getRecordKeys(String filePath, 
HoodieTableMetaClient datasetMetaClient,
+                                                 Option<Schema> 
writerSchemaOpt, int maxBufferSize,
+                                                 String latestCommitTimestamp) 
throws IOException {
+    if (writerSchemaOpt.isPresent()) {

Review Comment:
   code duplication



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
         });
   }
 
+  static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      int 
writesFileIdEncoding,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFilesWithDeletes = 
allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            // handle base files
+            if 
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+              return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
+            } else {
+              // for logs, we only need to process log files containing deletes
+              if (writeStat.getNumDeletes() > 0) {
+                StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+                Set<String> deletedRecordKeys = 
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+                    finalWriterSchemaOpt, maxBufferSize, instantTime);
+                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+              }
+              // ignore log file data blocks.
+              return new ArrayList<HoodieRecord>().iterator();
+            }
+          });
+
+      // there are chances that same record key from data table has 2 entries 
(1 delete from older partition and 1 insert to newer partition)
+      // lets do reduce by key to ignore the deleted entry.
+      return reduceByKeys(recordIndexRecords, parallelism);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate RLI records for metadata 
table", e);
+    }
+  }
+
+  /**
+   * There are chances that same record key from data table has 2 entries (1 
delete from older partition and 1 insert to newer partition)
+   * So, this method performs reduce by key to ignore the deleted entry.
+   * @param recordIndexRecords hoodie records after rli index lookup.
+   * @param parallelism parallelism to use.
+   * @return
+   */
+  private static HoodieData<HoodieRecord> 
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+    return recordIndexRecords.mapToPair(
+            (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>) 
t -> Pair.of(t.getKey(), t))
+        .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, 
HoodieRecord>) (record1, record2) -> {
+          boolean isRecord1Deleted = record1.getData() instanceof 
EmptyHoodieRecordPayload;
+          boolean isRecord2Deleted = record2.getData() instanceof 
EmptyHoodieRecordPayload;
+          if (isRecord1Deleted && !isRecord2Deleted) {
+            return record2;
+          } else if (!isRecord1Deleted && isRecord2Deleted) {
+            return record1;
+          } else {
+            throw new HoodieIOException("Two HoodieRecord updates to RLI is 
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+                + record1.getData().toString() + ", record 2 : " + 
record2.getData().toString());
+          }
+        }, parallelism).values();
+  }
+
+  @VisibleForTesting
+  public static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext 
engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> {

Review Comment:
   code duplication



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -171,6 +171,7 @@ private void init(String fileId, String partitionPath, 
HoodieBaseFile baseFileTo
     try {
       String latestValidFilePath = baseFileToMerge.getFileName();
       writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
+      writeStatus.getStat().setPrevBaseFile(latestValidFilePath);

Review Comment:
   and do other handles set this. - yes? no>? why?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,132 @@ public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(Hoodi
         });
   }
 
+  static HoodieData<HoodieRecord> 
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      int 
writesFileIdEncoding,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFilesWithDeletes = 
allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      HoodieData<HoodieRecord> recordIndexRecords = 
engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            // handle base files
+            if 
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+              return 
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
 writeStat, writesFileIdEncoding, instantTime, storage);
+            } else {
+              // for logs, we only need to process log files containing deletes
+              if (writeStat.getNumDeletes() > 0) {
+                StoragePath fullFilePath = new 
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+                Set<String> deletedRecordKeys = 
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+                    finalWriterSchemaOpt, maxBufferSize, instantTime);
+                return deletedRecordKeys.stream().map(recordKey -> 
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+              }
+              // ignore log file data blocks.
+              return new ArrayList<HoodieRecord>().iterator();
+            }
+          });
+
+      // there are chances that same record key from data table has 2 entries 
(1 delete from older partition and 1 insert to newer partition)
+      // lets do reduce by key to ignore the deleted entry.
+      return reduceByKeys(recordIndexRecords, parallelism);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate RLI records for metadata 
table", e);
+    }
+  }
+
+  /**
+   * There are chances that same record key from data table has 2 entries (1 
delete from older partition and 1 insert to newer partition)
+   * So, this method performs reduce by key to ignore the deleted entry.
+   * @param recordIndexRecords hoodie records after rli index lookup.
+   * @param parallelism parallelism to use.
+   * @return
+   */
+  private static HoodieData<HoodieRecord> 
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+    return recordIndexRecords.mapToPair(
+            (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>) 
t -> Pair.of(t.getKey(), t))
+        .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord, 
HoodieRecord>) (record1, record2) -> {
+          boolean isRecord1Deleted = record1.getData() instanceof 
EmptyHoodieRecordPayload;
+          boolean isRecord2Deleted = record2.getData() instanceof 
EmptyHoodieRecordPayload;
+          if (isRecord1Deleted && !isRecord2Deleted) {
+            return record2;
+          } else if (!isRecord1Deleted && isRecord2Deleted) {
+            return record1;
+          } else {
+            throw new HoodieIOException("Two HoodieRecord updates to RLI is 
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+                + record1.getData().toString() + ", record 2 : " + 
record2.getData().toString());
+          }
+        }, parallelism).values();
+  }
+
+  @VisibleForTesting
+  public static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext 
engineContext,
+                                                                      
HoodieCommitMetadata commitMetadata,
+                                                                      
HoodieMetadataConfig metadataConfig,
+                                                                      
HoodieTableMetaClient dataTableMetaClient,
+                                                                      String 
instantTime) {
+
+    List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+        .flatMap(Collection::stream).collect(Collectors.toList());
+
+    if (allWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    try {
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
metadataConfig.getRecordIndexMaxParallelism()), 1);
+      String basePath = dataTableMetaClient.getBasePath().toString();
+      // we might need to set some additional variables if we need to process 
log files.
+      boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> {
+        String fileName = FSUtils.getFileName(writeStat.getPath(), 
writeStat.getPartitionPath());
+        return FSUtils.isLogFile(fileName);
+      });
+      Option<Schema> writerSchemaOpt = Option.empty();
+      if (anyLogFiles) {
+        writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+      }
+      int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+      StorageConfiguration storageConfiguration = 
dataTableMetaClient.getStorageConf();
+      Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+      return engineContext.parallelize(allWriteStats, parallelism)
+          .flatMap(writeStat -> {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(new 
StoragePath(writeStat.getPath()), storageConfiguration);
+            // handle base files
+            if 
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+              return 
BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat, 
storage).iterator();
+            } else {
+              // for logs, every entry is either an update or a delete

Review Comment:
   do we throw errors if we find numInserts > 0 for logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to