This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c12decc86c [HUDI-6762] Removed usages of 
MetadataRecordsGenerationParams (#10962)
7c12decc86c is described below

commit 7c12decc86ccfbdc8bd06fe64d3e1c507cbbfbf6
Author: Vova Kolmakov <wombatu...@gmail.com>
AuthorDate: Tue Apr 16 00:05:57 2024 +0700

    [HUDI-6762] Removed usages of MetadataRecordsGenerationParams (#10962)
    
    Co-authored-by: Vova Kolmakov <kolmakov.vladi...@huawei-partners.com>
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 158 ++++++------
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 281 ++++++++++++---------
 .../metadata/MetadataRecordsGenerationParams.java  |  89 -------
 3 files changed, 234 insertions(+), 294 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 891cc88b9da..dea317e60b7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -262,7 +262,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
       if (!dataWriteConfig.isMetadataAsyncIndex()) {
         Set<String> completedPartitions = 
dataMetaClient.getTableConfig().getMetadataPartitions();
-        LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + completedPartitions);
+        LOG.info("Async metadata indexing disabled and following partitions 
already initialized: {}", completedPartitions);
         // TODO: fix the filter to check for exact partition name, e.g. 
completedPartitions could have func_index_datestr,
         //       but now the user is trying to initialize the 
func_index_dayhour partition.
         this.enabledPartitionTypes.stream()
@@ -345,12 +345,6 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       LOG.warn("Metadata Table will need to be re-initialized as no instants 
were found");
       return true;
     }
-
-    final String latestMetadataInstantTimestamp = 
latestMetadataInstant.get().getTimestamp();
-    if (latestMetadataInstantTimestamp.startsWith(SOLO_COMMIT_TIMESTAMP)) { // 
the initialization timestamp is SOLO_COMMIT_TIMESTAMP + offset
-      return false;
-    }
-
     return false;
   }
 
@@ -411,8 +405,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     for (MetadataPartitionType partitionType : partitionsToInit) {
       // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
       String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
-
-      LOG.info("Initializing MDT partition " + partitionType.name() + " at 
instant " + commitTimeForPartition);
+      String partitionTypeName = partitionType.name();
+      LOG.info("Initializing MDT partition {} at instant {}", 
partitionTypeName, commitTimeForPartition);
 
       Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
       try {
@@ -438,37 +432,41 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
             fileGroupCountAndRecordsPair = 
initializeFunctionalIndexPartition(functionalIndexPartitionsToInit.iterator().next());
             break;
           default:
-            throw new HoodieMetadataException("Unsupported MDT partition type: 
" + partitionType);
+            throw new HoodieMetadataException(String.format("Unsupported MDT 
partition type: %s", partitionType));
         }
       } catch (Exception e) {
         String metricKey = partitionType.getPartitionPath() + "_" + 
HoodieMetadataMetrics.BOOTSTRAP_ERR_STR;
         metrics.ifPresent(m -> m.setMetric(metricKey, 1));
-        LOG.error("Bootstrap on " + partitionType.getPartitionPath() + " 
partition failed for "
-            + metadataMetaClient.getBasePath(), e);
-        throw new HoodieMetadataException(partitionType.getPartitionPath()
-            + " bootstrap failed for " + metadataMetaClient.getBasePath(), e);
+        String errMsg = String.format("Bootstrap on %s partition failed for 
%s",
+            partitionType.getPartitionPath(), 
metadataMetaClient.getBasePathV2());
+        LOG.error(errMsg, e);
+        throw new HoodieMetadataException(errMsg, e);
       }
 
-      LOG.info(String.format("Initializing %s index with %d mappings and %d 
file groups.", partitionType.name(), fileGroupCountAndRecordsPair.getKey(),
-          fileGroupCountAndRecordsPair.getValue().count()));
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Initializing {} index with {} mappings and {} file groups.", 
partitionTypeName, fileGroupCountAndRecordsPair.getKey(),
+            fileGroupCountAndRecordsPair.getValue().count());
+      }
       HoodieTimer partitionInitTimer = HoodieTimer.start();
 
       // Generate the file groups
       final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
-      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType.name() + " should be > 0");
+      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionTypeName + " should be > 0");
       initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
 
       // Perform the commit using bulkCommit
       HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
       bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
       metadataMetaClient.reloadActiveTimeline();
-      String partitionPath = (partitionType == FUNCTIONAL_INDEX) ? 
dataWriteConfig.getFunctionalIndexConfig().getIndexName() : 
partitionType.getPartitionPath();
+      String partitionPath = partitionType == FUNCTIONAL_INDEX
+          ? dataWriteConfig.getFunctionalIndexConfig().getIndexName()
+          : partitionType.getPartitionPath();
 
       
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionPath, true);
       // initialize the metadata reader again so the MDT partition can be read 
after initialization
       initMetadataReader();
       long totalInitTime = partitionInitTimer.endTimer();
-      LOG.info(String.format("Initializing %s index in metadata table took " + 
totalInitTime + " in ms", partitionType.name()));
+      LOG.info("Initializing {} index in metadata table took {} in ms", 
partitionTypeName, totalInitTime);
     }
 
     return true;
@@ -502,7 +500,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
 
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionToFilesMap) {
     HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-        engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
+        engineContext, Collections.emptyMap(), partitionToFilesMap, 
dataMetaClient, dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
+            dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex());
 
     final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
     return Pair.of(fileGroupCount, records);
@@ -510,7 +509,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
 
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializeBloomFiltersPartition(String createInstantTime, Map<String, 
Map<String, Long>> partitionToFilesMap) {
     HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-        engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
+        engineContext, Collections.emptyMap(), partitionToFilesMap, 
createInstantTime, dataMetaClient,
+            dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.getBloomFilterType());
 
     final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
     return Pair.of(fileGroupCount, records);
@@ -568,8 +568,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
           .map(basefile -> Pair.of(partition, 
basefile)).collect(Collectors.toList()));
     }
 
-    LOG.info("Initializing record index from " + partitionBaseFilePairs.size() 
+ " base files in "
-        + partitions.size() + " partitions");
+    LOG.info("Initializing record index from {} base files in {} partitions", 
partitionBaseFilePairs.size(), partitions.size());
 
     // Collect record keys from the files in parallel
     HoodieData<HoodieRecord> records = readRecordKeysFromBaseFiles(
@@ -590,7 +589,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
         dataWriteConfig.getRecordIndexMaxFileGroupCount(), 
dataWriteConfig.getRecordIndexGrowthFactor(),
         dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
 
-    LOG.info(String.format("Initializing record index with %d mappings and %d 
file groups.", recordCount, fileGroupCount));
+    LOG.info("Initializing record index with {} mappings and {} file groups.", 
recordCount, fileGroupCount);
     return Pair.of(fileGroupCount, records);
   }
 
@@ -632,8 +631,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
 
     if (!pendingDataInstant.isEmpty()) {
       metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.BOOTSTRAP_ERR_STR, 1));
-      LOG.warn("Cannot initialize metadata table as operation(s) are in 
progress on the dataset: "
-          + Arrays.toString(pendingDataInstant.toArray()));
+      LOG.warn("Cannot initialize metadata table as operation(s) are in 
progress on the dataset: {}",
+          Arrays.toString(pendingDataInstant.toArray()));
       return true;
     }
     return false;
@@ -672,7 +671,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     final int fileListingParallelism = 
metadataWriteConfig.getFileListingParallelism();
     SerializableConfiguration conf = new 
SerializableConfiguration(dataMetaClient.getHadoopConf());
     final String dirFilterRegex = 
dataWriteConfig.getMetadataConfig().getDirectoryFilterRegex();
-    final String datasetBasePath = dataMetaClient.getBasePath();
+    final String datasetBasePath = dataMetaClient.getBasePathV2().toString();
     SerializablePath serializableBasePath = new SerializablePath(new 
CachingPath(datasetBasePath));
 
     while (!pathsToList.isEmpty()) {
@@ -694,7 +693,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
         if (!dirFilterRegex.isEmpty()) {
           final String relativePath = dirInfo.getRelativePath();
           if (!relativePath.isEmpty() && relativePath.matches(dirFilterRegex)) 
{
-            LOG.info("Ignoring directory " + relativePath + " which matches 
the filter regex " + dirFilterRegex);
+            LOG.info("Ignoring directory {} which matches the filter regex 
{}", relativePath, dirFilterRegex);
             continue;
           }
         }
@@ -750,7 +749,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     try {
       final FileStatus[] existingFiles = fs.listStatus(partitionPath);
       if (existingFiles.length > 0) {
-        LOG.warn("Deleting all existing files found in MDT partition " + 
partitionName);
+        LOG.warn("Deleting all existing files found in MDT partition {}", 
partitionName);
         fs.delete(partitionPath, true);
         ValidationUtils.checkState(!fs.exists(partitionPath), "Failed to 
delete MDT partition " + partitionName);
       }
@@ -795,7 +794,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
         writer.appendBlock(block);
         writer.close();
       } catch (InterruptedException e) {
-        throw new HoodieException("Failed to created fileGroup " + 
fileGroupFileId + " for partition " + partitionName, e);
+        throw new HoodieException(String.format("Failed to created fileGroup 
%s for partition %s", fileGroupFileId, partitionName), e);
       }
     }, fileGroupFileIds.size());
   }
@@ -805,10 +804,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       String partitionPath = partitionType.getPartitionPath();
       // first update table config
       
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionPath, false);
-      LOG.warn("Deleting Metadata Table partition: " + partitionPath);
+      LOG.warn("Deleting Metadata Table partition: {}", partitionPath);
       dataMetaClient.getFs().delete(new 
Path(metadataWriteConfig.getBasePath(), partitionPath), true);
       // delete corresponding pending indexing instant file in the timeline
-      LOG.warn("Deleting pending indexing instant from the timeline for 
partition: " + partitionPath);
+      LOG.warn("Deleting pending indexing instant from the timeline for 
partition: {}", partitionPath);
       deletePendingIndexingInstant(dataMetaClient, partitionPath);
     }
     closeInternal();
@@ -830,7 +829,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
               
metaClient.getActiveTimeline().deleteInstantFileIfExists(getIndexInflightInstant(instant.getTimestamp()));
             }
           } catch (IOException e) {
-            LOG.error("Failed to delete the instant file corresponding to " + 
instant);
+            LOG.error("Failed to delete the instant file corresponding to {}", 
instant);
           }
         });
   }
@@ -850,18 +849,6 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     }
   }
 
-  private MetadataRecordsGenerationParams getRecordsGenerationParams() {
-    return new MetadataRecordsGenerationParams(
-        dataMetaClient,
-        enabledPartitionTypes,
-        dataWriteConfig.getBloomFilterType(),
-        dataWriteConfig.getMetadataBloomFilterIndexParallelism(),
-        dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
-        dataWriteConfig.getColumnStatsIndexParallelism(),
-        dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
-        dataWriteConfig.getColumnsEnabledForBloomFilterIndex());
-  }
-
   /**
    * Interface to assist in converting commit metadata to List of 
HoodieRecords to be written to metadata table.
    * Updates of different commit metadata uses the same method to convert to 
HoodieRecords and hence.
@@ -913,12 +900,13 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     List<MetadataPartitionType> partitionTypes = new ArrayList<>();
     indexPartitionInfos.forEach(indexPartitionInfo -> {
       String relativePartitionPath = 
indexPartitionInfo.getMetadataPartitionPath();
-      LOG.info(String.format("Creating a new metadata index for partition '%s' 
under path %s upto instant %s",
-          relativePartitionPath, metadataWriteConfig.getBasePath(), 
indexUptoInstantTime));
+      LOG.info("Creating a new metadata index for partition '{}' under path {} 
upto instant {}",
+          relativePartitionPath, metadataWriteConfig.getBasePath(), 
indexUptoInstantTime);
 
       // return early and populate enabledPartitionTypes correctly (check in 
initialCommit)
-      MetadataPartitionType partitionType = 
relativePartitionPath.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX)
 ? FUNCTIONAL_INDEX :
-          
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT));
+      MetadataPartitionType partitionType = 
relativePartitionPath.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX)
+          ? FUNCTIONAL_INDEX
+          : 
MetadataPartitionType.valueOf(relativePartitionPath.toUpperCase(Locale.ROOT));
       if (!enabledPartitionTypes.contains(partitionType)) {
         throw new HoodieIndexException(String.format("Indexing for metadata 
partition: %s is not enabled", partitionType));
       }
@@ -944,7 +932,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     processAndCommit(instantTime, () -> {
       Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap =
           HoodieTableMetadataUtil.convertMetadataToRecords(
-              engineContext, dataWriteConfig, commitMetadata, instantTime, 
getRecordsGenerationParams());
+              engineContext, dataWriteConfig, commitMetadata, instantTime, 
dataMetaClient,
+                  enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
+                  dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
+                  dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex());
 
       // Updates for record index are created by parsing the WriteStatus which 
is a hudi-client object. Hence, we cannot yet move this code
       // to the HoodieTableMetadataUtil class in hudi-common.
@@ -962,7 +953,10 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     processAndCommit(instantTime, () -> {
       Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordMap =
           HoodieTableMetadataUtil.convertMetadataToRecords(
-              engineContext, dataWriteConfig, commitMetadata, instantTime, 
getRecordsGenerationParams());
+              engineContext, dataWriteConfig, commitMetadata, instantTime, 
dataMetaClient,
+                  enabledPartitionTypes, dataWriteConfig.getBloomFilterType(),
+                  dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
+                  dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex());
       HoodieData<HoodieRecord> additionalUpdates = 
getRecordIndexAdditionalUpserts(records, commitMetadata);
       partitionToRecordMap.put(RECORD_INDEX, records.union(additionalUpdates));
       updateFunctionalIndexIfPresent(commitMetadata, instantTime, 
partitionToRecordMap);
@@ -983,7 +977,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
           try {
             functionalIndexRecords = getFunctionalIndexUpdates(commitMetadata, 
partition, instantTime);
           } catch (Exception e) {
-            throw new HoodieMetadataException("Failed to get functional index 
updates for partition " + partition, e);
+            throw new HoodieMetadataException(String.format("Failed to get 
functional index updates for partition %s", partition), e);
           }
           partitionToRecordMap.put(FUNCTIONAL_INDEX, functionalIndexRecords);
         });
@@ -1025,7 +1019,9 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   @Override
   public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
     processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
-        cleanMetadata, getRecordsGenerationParams(), instantTime));
+            cleanMetadata, instantTime, dataMetaClient, enabledPartitionTypes,
+            dataWriteConfig.getBloomIndexParallelism(), 
dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
+            dataWriteConfig.getColumnStatsIndexParallelism(), 
dataWriteConfig.getColumnsEnabledForColumnStatsIndex()));
     closeInternal();
   }
 
@@ -1040,22 +1036,22 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
     dataMetaClient.reloadActiveTimeline();
 
     // Fetch the commit to restore to (savepointed commit time)
-    HoodieInstant restoreInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.RESTORE_ACTION, 
instantTime);
+    HoodieInstant restoreInstant = new HoodieInstant(REQUESTED, 
HoodieTimeline.RESTORE_ACTION, instantTime);
     HoodieInstant requested = 
HoodieTimeline.getRestoreRequestedInstant(restoreInstant);
     HoodieRestorePlan restorePlan = null;
     try {
       restorePlan = TimelineMetadataUtils.deserializeAvroMetadata(
           
dataMetaClient.getActiveTimeline().readRestoreInfoAsBytes(requested).get(), 
HoodieRestorePlan.class);
     } catch (IOException e) {
-      throw new HoodieIOException("Deserialization of restore plan failed 
whose restore instant time is " + instantTime + " in data table", e);
+      throw new HoodieIOException(String.format("Deserialization of restore 
plan failed whose restore instant time is %s in data table", instantTime), e);
     }
     final String restoreToInstantTime = 
restorePlan.getSavepointToRestoreTimestamp();
-    LOG.info("Triggering restore to " + restoreToInstantTime + " in metadata 
table");
+    LOG.info("Triggering restore to {} in metadata table", 
restoreToInstantTime);
 
     // fetch the earliest commit to retain and ensure the base file prior to 
the time to restore is present
     List<HoodieFileGroup> filesGroups = 
metadata.getMetadataFileSystemView().getAllFileGroups(FILES.getPartitionPath()).collect(Collectors.toList());
 
-    boolean cannotRestore = filesGroups.stream().map(fileGroup -> 
fileGroup.getAllFileSlices().map(fileSlice -> 
fileSlice.getBaseInstantTime()).anyMatch(
+    boolean cannotRestore = filesGroups.stream().map(fileGroup -> 
fileGroup.getAllFileSlices().map(FileSlice::getBaseInstantTime).anyMatch(
         instantTime1 -> HoodieTimeline.compareTimestamps(instantTime1, 
LESSER_THAN_OR_EQUALS, restoreToInstantTime))).anyMatch(canRestore -> 
!canRestore);
     if (cannotRestore) {
       throw new HoodieMetadataException(String.format("Can't restore to %s 
since there is no base file in MDT lesser than the commit to restore to. "
@@ -1117,13 +1113,13 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
       // The deltacommit that will be rolled back
       HoodieInstant deltaCommitInstant = new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, commitToRollbackInstantTime);
       if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) {
-        LOG.info("Rolling back MDT deltacommit " + 
commitToRollbackInstantTime);
+        LOG.info("Rolling back MDT deltacommit {}", 
commitToRollbackInstantTime);
         if (!getWriteClient().rollback(commitToRollbackInstantTime, 
instantTime)) {
-          throw new HoodieMetadataException("Failed to rollback deltacommit at 
" + commitToRollbackInstantTime);
+          throw new HoodieMetadataException(String.format("Failed to rollback 
deltacommit at %s", commitToRollbackInstantTime));
         }
       } else {
-        LOG.info(String.format("Ignoring rollback of instant %s at %s. The 
commit to rollback is not found in MDT",
-            commitToRollbackInstantTime, instantTime));
+        LOG.info("Ignoring rollback of instant {} at {}. The commit to 
rollback is not found in MDT",
+            commitToRollbackInstantTime, instantTime);
       }
       closeInternal();
     }
@@ -1136,12 +1132,13 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
     // The commit being rolled back should not be earlier than the latest 
compaction on the MDT because the latest file slice does not change after all.
     // Compaction on MDT only occurs when all actions are completed on the 
dataset.
     // Hence, this case implies a rollback of completed commit which should 
actually be handled using restore.
-    if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
+    if (compactionInstant.getAction().equals(COMMIT_ACTION)) {
       final String compactionInstantTime = compactionInstant.getTimestamp();
-      if (commitToRollbackInstantTime.length() == 
compactionInstantTime.length() && 
HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime, 
compactionInstantTime)) {
-        throw new HoodieMetadataException(String.format("Commit being rolled 
back %s is earlier than the latest compaction %s. "
-                + "There are %d deltacommits after this compaction: %s", 
commitToRollbackInstantTime, compactionInstantTime,
-            deltacommitsSinceCompaction.countInstants(), 
deltacommitsSinceCompaction.getInstants()));
+      if (commitToRollbackInstantTime.length() == 
compactionInstantTime.length() && 
LESSER_THAN_OR_EQUALS.test(commitToRollbackInstantTime, compactionInstantTime)) 
{
+        throw new HoodieMetadataException(
+            String.format("Commit being rolled back %s is earlier than the 
latest compaction %s. There are %d deltacommits after this compaction: %s",
+                commitToRollbackInstantTime, compactionInstantTime, 
deltacommitsSinceCompaction.countInstants(), 
deltacommitsSinceCompaction.getInstants())
+        );
       }
     }
   }
@@ -1187,7 +1184,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
 
     if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
       // if this is a new commit being applied to metadata for the first time
-      LOG.info("New commit at " + instantTime + " being applied to MDT.");
+      LOG.info("New commit at {} being applied to MDT.", instantTime);
     } else {
       // this code path refers to a re-attempted commit that:
       //   1. got committed to metadata table, but failed in datatable.
@@ -1199,12 +1196,12 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
       // already part of completed commit. So, we have to manually rollback 
the completed instant and proceed.
       Option<HoodieInstant> alreadyCompletedInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry 
-> entry.getTimestamp().equals(instantTime))
           .lastInstant();
-      LOG.info(String.format("%s completed commit at %s being applied to MDT.",
-          alreadyCompletedInstant.isPresent() ? "Already" : "Partially", 
instantTime));
+      LOG.info("{} completed commit at {} being applied to MDT.",
+          alreadyCompletedInstant.isPresent() ? "Already" : "Partially", 
instantTime);
 
       // Rollback the previous commit
       if (!writeClient.rollback(instantTime)) {
-        throw new HoodieMetadataException("Failed to rollback deltacommit at " 
+ instantTime + " from MDT");
+        throw new HoodieMetadataException(String.format("Failed to rollback 
deltacommit at %s from MDT", instantTime));
       }
       metadataMetaClient.reloadActiveTimeline();
     }
@@ -1271,7 +1268,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
         fileSlices = 
getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient, 
Option.ofNullable(fsView), partitionName);
       }
       final int fileGroupCount = fileSlices.size();
-      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionName + " should be >0");
+      ValidationUtils.checkArgument(fileGroupCount > 0, 
String.format("FileGroup count for MDT partition %s should be >0", 
partitionName));
 
       List<FileSlice> finalFileSlices = fileSlices;
       HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> {
@@ -1319,7 +1316,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       // Do timeline validation before scheduling compaction/logCompaction 
operations.
       if (validateCompactionScheduling()) {
         String latestDeltacommitTime = lastInstant.get().getTimestamp();
-        LOG.info("Latest deltacommit time found is " + latestDeltacommitTime + 
", running compaction operations.");
+        LOG.info("Latest deltacommit time found is {}, running compaction 
operations.", latestDeltacommitTime);
         compactIfNecessary(writeClient, latestDeltacommitTime);
       }
       writeClient.archive();
@@ -1368,17 +1365,17 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
     // and again w/ C6, we will re-attempt compaction at which point latest 
delta commit is C4 in MDT.
     // and so we try compaction w/ instant C4001. So, we can avoid compaction 
if we already have compaction w/ same instant time.
     if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
 {
-      LOG.info(String.format("Compaction with same %s time is already present 
in the timeline.", compactionInstantTime));
+      LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
     } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
-      LOG.info("Compaction is scheduled for timestamp " + 
compactionInstantTime);
+      LOG.info("Compaction is scheduled for timestamp {}", 
compactionInstantTime);
       writeClient.compact(compactionInstantTime);
     } else if (metadataWriteConfig.isLogCompactionEnabled()) {
       // Schedule and execute log compaction with new instant time.
       final String logCompactionInstantTime = 
metadataMetaClient.createNewInstantTime(false);
       if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(logCompactionInstantTime))
 {
-        LOG.info(String.format("Log compaction with same %s time is already 
present in the timeline.", logCompactionInstantTime));
+        LOG.info("Log compaction with same {} time is already present in the 
timeline.", logCompactionInstantTime);
       } else if 
(writeClient.scheduleLogCompactionAtInstant(logCompactionInstantTime, 
Option.empty())) {
-        LOG.info("Log compaction is scheduled for timestamp " + 
logCompactionInstantTime);
+        LOG.info("Log compaction is scheduled for timestamp {}", 
logCompactionInstantTime);
         writeClient.logCompact(logCompactionInstantTime);
       }
     }
@@ -1417,8 +1414,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       Option<HoodieInstant> pendingCompactionInstant =
           
metadataMetaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant();
       if (pendingLogCompactionInstant.isPresent() || 
pendingCompactionInstant.isPresent()) {
-        LOG.warn(String.format("Not scheduling compaction or logCompaction, 
since a pending compaction instant %s or logCompaction %s instant is present",
-            pendingCompactionInstant, pendingLogCompactionInstant));
+        LOG.warn("Not scheduling compaction or logCompaction, since a pending 
compaction instant {} or logCompaction {} instant is present",
+            pendingCompactionInstant, pendingLogCompactionInstant);
         return false;
       }
     }
@@ -1487,8 +1484,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
               // newLocation should have the same fileID as currentLocation. 
The instantTimes differ as newLocation's
               // instantTime refers to the current commit which was completed.
               if 
(!recordDelegate.getCurrentLocation().get().getFileId().equals(newLocation.get().getFileId()))
 {
-                final String msg = String.format("Detected update in location 
of record with key %s from %s "
-                        + " to %s. The fileID should not change.",
+                final String msg = String.format("Detected update in location 
of record with key %s from %s to %s. The fileID should not change.",
                     recordDelegate, recordDelegate.getCurrentLocation().get(), 
newLocation.get());
                 LOG.error(msg);
                 throw new HoodieMetadataException(msg);
@@ -1617,7 +1613,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
           // Regular HUDI data file (base file or log file)
           String dataFileCommitTime = 
FSUtils.getCommitTime(status.getPath().getName());
           // Limit the file listings to files which were created before the 
maxInstant time.
-          if (HoodieTimeline.compareTimestamps(dataFileCommitTime, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, maxInstantTime)) {
+          if (HoodieTimeline.compareTimestamps(dataFileCommitTime, 
LESSER_THAN_OR_EQUALS, maxInstantTime)) {
             filenameToSizeMap.put(status.getPath().getName(), status.getLen());
           }
         }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 8852b52a1be..d8270ed17d4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -80,6 +80,7 @@ import org.apache.hudi.common.util.collection.Tuple3;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -153,6 +154,9 @@ public class HoodieTableMetadataUtil {
   public static final String PARTITION_NAME_RECORD_INDEX = "record_index";
   public static final String PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX = 
"func_index_";
 
+  private HoodieTableMetadataUtil() {
+  }
+
   public static final Set<Class<?>> COLUMN_STATS_RECORD_SUPPORTED_TYPES = new 
HashSet<>(Arrays.asList(
       IntWrapper.class, BooleanWrapper.class, DateWrapper.class,
       DoubleWrapper.class, FloatWrapper.class, LongWrapper.class,
@@ -162,7 +166,7 @@ public class HoodieTableMetadataUtil {
   // are reserved for future operations on the MDT.
   private static final int PARTITION_INITIALIZATION_TIME_SUFFIX = 10; // 
corresponds to "010";
   // we have max of 4 partitions (FILES, COL_STATS, BLOOM, RLI)
-  private static final List<String> 
VALID_PARTITION_INITIALIZATION_TIME_SUFFIXES = 
Arrays.asList("010","011","012","013");
+  private static final List<String> 
VALID_PARTITION_INITIALIZATION_TIME_SUFFIXES = Arrays.asList("010", "011", 
"012", "013");
 
   /**
    * Returns whether the files partition of metadata table is ready for read.
@@ -205,7 +209,7 @@ public class HoodieTableMetadataUtil {
       // For each column (field) we have to index update corresponding column 
stats
       // with the values from this record
       targetFields.forEach(field -> {
-        ColumnStats colStats = allColumnStats.computeIfAbsent(field.name(), 
(ignored) -> new ColumnStats());
+        ColumnStats colStats = allColumnStats.computeIfAbsent(field.name(), 
ignored -> new ColumnStats());
 
         Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(recordSchema, 
field.name());
         Object fieldValue;
@@ -235,7 +239,7 @@ public class HoodieTableMetadataUtil {
     });
 
     Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, 
HoodieColumnRangeMetadata<Comparable>>> collector =
-        Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(), 
Function.identity());
+        Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, 
Function.identity());
 
     return (Map<String, HoodieColumnRangeMetadata<Comparable>>) 
targetFields.stream()
         .map(field -> {
@@ -274,14 +278,14 @@ public class HoodieTableMetadataUtil {
 
   public static Option<String> getColumnStatsValueAsString(Object statsValue) {
     if (statsValue == null) {
-      LOG.info("Invalid column stats value: " + statsValue);
+      LOG.info("Invalid column stats value: {}", statsValue);
       return Option.empty();
     }
     Class<?> statsValueClass = statsValue.getClass();
     if (COLUMN_STATS_RECORD_SUPPORTED_TYPES.contains(statsValueClass)) {
       return Option.of(String.valueOf(((IndexedRecord) statsValue).get(0)));
     } else {
-      throw new RuntimeException("Unsupported type: " + 
statsValueClass.getSimpleName());
+      throw new HoodieNotSupportedException("Unsupported type: " + 
statsValueClass.getSimpleName());
     }
   }
 
@@ -329,28 +333,44 @@ public class HoodieTableMetadataUtil {
   /**
    * Convert commit action to metadata records for the enabled partition types.
    *
-   * @param commitMetadata          - Commit action metadata
-   * @param hoodieConfig            - Hudi configs
-   * @param instantTime             - Action instant time
-   * @param recordsGenerationParams - Parameters for the record generation
+   * @param context                          - Engine context to use
+   * @param hoodieConfig                     - Hudi configs
+   * @param commitMetadata                   - Commit action metadata
+   * @param instantTime                      - Action instant time
+   * @param dataMetaClient                   - HoodieTableMetaClient for data
+   * @param enabledPartitionTypes            - List of enabled MDT partitions
+   * @param bloomFilterType                  - Type of generated bloom filter 
records
+   * @param bloomIndexParallelism            - Parallelism for bloom filter 
record generation
+   * @param isColumnStatsIndexEnabled        - Is column stats index enabled
+   * @param columnStatsIndexParallelism      - Parallelism for column stats 
index records generation
+   * @param targetColumnsForColumnStatsIndex - List of columns for column 
stats index
    * @return Map of partition to metadata records for the commit action
    */
-  public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
convertMetadataToRecords(
-      HoodieEngineContext context, HoodieConfig hoodieConfig, 
HoodieCommitMetadata commitMetadata,
-      String instantTime, MetadataRecordsGenerationParams 
recordsGenerationParams) {
+  public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
convertMetadataToRecords(HoodieEngineContext context,
+                                                                               
               HoodieConfig hoodieConfig,
+                                                                               
               HoodieCommitMetadata commitMetadata,
+                                                                               
               String instantTime,
+                                                                               
               HoodieTableMetaClient dataMetaClient,
+                                                                               
               List<MetadataPartitionType> enabledPartitionTypes,
+                                                                               
               String bloomFilterType,
+                                                                               
               int bloomIndexParallelism,
+                                                                               
               boolean isColumnStatsIndexEnabled,
+                                                                               
               int columnStatsIndexParallelism,
+                                                                               
               List<String> targetColumnsForColumnStatsIndex) {
     final Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordsMap = new HashMap<>();
     final HoodieData<HoodieRecord> filesPartitionRecordsRDD = 
context.parallelize(
         convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 
1);
     partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecordsRDD);
 
-    if 
(recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS))
 {
+    if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
       final HoodieData<HoodieRecord> metadataBloomFilterRecords = 
convertMetadataToBloomFilterRecords(
-          context, hoodieConfig, commitMetadata, instantTime, 
recordsGenerationParams);
+          context, hoodieConfig, commitMetadata, instantTime, dataMetaClient, 
bloomFilterType, bloomIndexParallelism);
       partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
metadataBloomFilterRecords);
     }
 
-    if 
(recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS))
 {
-      final HoodieData<HoodieRecord> metadataColumnStatsRDD = 
convertMetadataToColumnStatsRecords(commitMetadata, context, 
recordsGenerationParams);
+    if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
+      final HoodieData<HoodieRecord> metadataColumnStatsRDD = 
convertMetadataToColumnStatsRecords(commitMetadata, context,
+              dataMetaClient, isColumnStatsIndexEnabled, 
columnStatsIndexParallelism, targetColumnsForColumnStatsIndex);
       partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
metadataColumnStatsRDD);
     }
     return partitionToRecordsMap;
@@ -387,7 +407,7 @@ public class HoodieTableMetadataUtil {
                         String pathWithPartition = stat.getPath();
                         if (pathWithPartition == null) {
                           // Empty partition
-                          LOG.warn("Unable to find path in write stat to 
update metadata table " + stat);
+                          LOG.warn("Unable to find path in write stat to 
update metadata table {}", stat);
                           return map;
                         }
 
@@ -401,9 +421,7 @@ public class HoodieTableMetadataUtil {
 
                         Map<String, Long> cdcPathAndSizes = stat.getCdcStats();
                         if (cdcPathAndSizes != null && 
!cdcPathAndSizes.isEmpty()) {
-                          cdcPathAndSizes.entrySet().forEach(cdcEntry -> {
-                            map.put(FSUtils.getFileName(cdcEntry.getKey(), 
partitionStatName), cdcEntry.getValue());
-                          });
+                          cdcPathAndSizes.forEach((key, value) -> 
map.put(FSUtils.getFileName(key, partitionStatName), value));
                         }
                         return map;
                       },
@@ -417,8 +435,8 @@ public class HoodieTableMetadataUtil {
 
     records.addAll(updatedPartitionFilesRecords);
 
-    LOG.info(String.format("Updating at %s from Commit/%s. 
#partitions_updated=%d, #files_added=%d", instantTime, 
commitMetadata.getOperationType(),
-        records.size(), newFileCount.value()));
+    LOG.info("Updating at {} from Commit/{}. #partitions_updated={}, 
#files_added={}", instantTime, commitMetadata.getOperationType(),
+        records.size(), newFileCount.value());
 
     return records;
   }
@@ -447,21 +465,28 @@ public class HoodieTableMetadataUtil {
    * Convert commit action metadata to bloom filter records.
    *
    * @param context                 - Engine context to use
+   * @param hoodieConfig            - Hudi configs
    * @param commitMetadata          - Commit action metadata
    * @param instantTime             - Action instant time
-   * @param recordsGenerationParams - Parameters for bloom filter record 
generation
+   * @param dataMetaClient          - HoodieTableMetaClient for data
+   * @param bloomFilterType         - Type of generated bloom filter records
+   * @param bloomIndexParallelism   - Parallelism for bloom filter record 
generation
    * @return HoodieData of metadata table records
    */
-  public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(
-      HoodieEngineContext context, HoodieConfig hoodieConfig, 
HoodieCommitMetadata commitMetadata,
-      String instantTime, MetadataRecordsGenerationParams 
recordsGenerationParams) {
+  public static HoodieData<HoodieRecord> 
convertMetadataToBloomFilterRecords(HoodieEngineContext context,
+                                                                             
HoodieConfig hoodieConfig,
+                                                                             
HoodieCommitMetadata commitMetadata,
+                                                                             
String instantTime,
+                                                                             
HoodieTableMetaClient dataMetaClient,
+                                                                             
String bloomFilterType,
+                                                                             
int bloomIndexParallelism) {
     final List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
-        .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+        .flatMap(Collection::stream).collect(Collectors.toList());
     if (allWriteStats.isEmpty()) {
       return context.emptyHoodieData();
     }
 
-    final int parallelism = Math.max(Math.min(allWriteStats.size(), 
recordsGenerationParams.getBloomIndexParallelism()), 1);
+    final int parallelism = Math.max(Math.min(allWriteStats.size(), 
bloomIndexParallelism), 1);
     HoodieData<HoodieWriteStat> allWriteStatsRDD = 
context.parallelize(allWriteStats, parallelism);
     return allWriteStatsRDD.flatMap(hoodieWriteStat -> {
       final String partition = hoodieWriteStat.getPartitionPath();
@@ -474,7 +499,7 @@ public class HoodieTableMetadataUtil {
       String pathWithPartition = hoodieWriteStat.getPath();
       if (pathWithPartition == null) {
         // Empty partition
-        LOG.error("Failed to find path in write stat to update metadata table 
" + hoodieWriteStat);
+        LOG.error("Failed to find path in write stat to update metadata table 
{}", hoodieWriteStat);
         return Collections.emptyListIterator();
       }
 
@@ -483,28 +508,26 @@ public class HoodieTableMetadataUtil {
         return Collections.emptyListIterator();
       }
 
-      final Path writeFilePath = new 
Path(recordsGenerationParams.getDataMetaClient().getBasePath(), 
pathWithPartition);
+      final Path writeFilePath = new Path(dataMetaClient.getBasePathV2(), 
pathWithPartition);
       try (HoodieFileReader fileReader =
                
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO).getFileReader(
-                   hoodieConfig, 
recordsGenerationParams.getDataMetaClient().getHadoopConf(), writeFilePath)) {
+                   hoodieConfig, dataMetaClient.getHadoopConf(), 
writeFilePath)) {
         try {
           final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
           if (fileBloomFilter == null) {
-            LOG.error("Failed to read bloom filter for " + writeFilePath);
+            LOG.error("Failed to read bloom filter for {}", writeFilePath);
             return Collections.emptyListIterator();
           }
           ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(getUTF8Bytes(fileBloomFilter.serializeToString()));
           HoodieRecord record = 
HoodieMetadataPayload.createBloomFilterMetadataRecord(
-              partition, fileName, instantTime, 
recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false);
+              partition, fileName, instantTime, bloomFilterType, 
bloomByteBuffer, false);
           return Collections.singletonList(record).iterator();
         } catch (Exception e) {
-          LOG.error("Failed to read bloom filter for " + writeFilePath);
+          LOG.error("Failed to read bloom filter for {}", writeFilePath);
           return Collections.emptyListIterator();
-        } finally {
-          fileReader.close();
         }
       } catch (IOException e) {
-        LOG.error("Failed to get bloom filter for file: " + writeFilePath + ", 
write stat: " + hoodieWriteStat);
+        LOG.error("Failed to get bloom filter for file: {}, write stat: {}", 
writeFilePath, hoodieWriteStat);
       }
       return Collections.emptyListIterator();
     });
@@ -515,22 +538,28 @@ public class HoodieTableMetadataUtil {
    */
   public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
convertMetadataToRecords(HoodieEngineContext engineContext,
                                                                                
               HoodieCleanMetadata cleanMetadata,
-                                                                               
               MetadataRecordsGenerationParams recordsGenerationParams,
-                                                                               
               String instantTime) {
+                                                                               
               String instantTime,
+                                                                               
               HoodieTableMetaClient dataMetaClient,
+                                                                               
               List<MetadataPartitionType> enabledPartitionTypes,
+                                                                               
               int bloomIndexParallelism,
+                                                                               
               boolean isColumnStatsIndexEnabled,
+                                                                               
               int columnStatsIndexParallelism,
+                                                                               
               List<String> targetColumnsForColumnStatsIndex) {
     final Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordsMap = new HashMap<>();
     final HoodieData<HoodieRecord> filesPartitionRecordsRDD = 
engineContext.parallelize(
         convertMetadataToFilesPartitionRecords(cleanMetadata, instantTime), 1);
     partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecordsRDD);
 
-    if 
(recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.BLOOM_FILTERS))
 {
+    if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
       final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD =
-          convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, 
instantTime, recordsGenerationParams);
+          convertMetadataToBloomFilterRecords(cleanMetadata, engineContext, 
instantTime, bloomIndexParallelism);
       partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
metadataBloomFilterRecordsRDD);
     }
 
-    if 
(recordsGenerationParams.getEnabledPartitionTypes().contains(MetadataPartitionType.COLUMN_STATS))
 {
+    if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
       final HoodieData<HoodieRecord> metadataColumnStatsRDD =
-          convertMetadataToColumnStatsRecords(cleanMetadata, engineContext, 
recordsGenerationParams);
+          convertMetadataToColumnStatsRecords(cleanMetadata, engineContext,
+                  dataMetaClient, isColumnStatsIndexEnabled, 
columnStatsIndexParallelism, targetColumnsForColumnStatsIndex);
       partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, 
metadataColumnStatsRDD);
     }
 
@@ -566,8 +595,8 @@ public class HoodieTableMetadataUtil {
       // if there are partitions to be deleted, add them to delete list
       
records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, 
true));
     }
-    LOG.info("Updating at " + instantTime + " from Clean. 
#partitions_updated=" + records.size()
-        + ", #files_deleted=" + fileDeleteCount[0] + ", #partitions_deleted=" 
+ deletedPartitions.size());
+    LOG.info("Updating at {} from Clean. #partitions_updated={}, 
#files_deleted={}, #partitions_deleted={}",
+            instantTime, records.size(), fileDeleteCount[0], 
deletedPartitions.size());
     return records;
   }
 
@@ -600,8 +629,8 @@ public class HoodieTableMetadataUtil {
       
records.add(HoodieMetadataPayload.createPartitionListRecord(deletedPartitions, 
true));
     }
 
-    LOG.info("Re-adding missing records at " + instantTime + " during Restore. 
#partitions_updated=" + records.size()
-        + ", #files_added=" + filesAddedCount[0] + ", #files_deleted=" + 
fileDeleteCount[0] + ", #partitions_deleted=" + deletedPartitions.size());
+    LOG.info("Re-adding missing records at {} during Restore. 
#partitions_updated={}, #files_added={}, #files_deleted={}, 
#partitions_deleted={}",
+            instantTime, records.size(), filesAddedCount[0], 
fileDeleteCount[0], deletedPartitions.size());
     return Collections.singletonMap(MetadataPartitionType.FILES, 
engineContext.parallelize(records, 1));
   }
 
@@ -611,13 +640,13 @@ public class HoodieTableMetadataUtil {
    * @param cleanMetadata           - Clean action metadata
    * @param engineContext           - Engine context
    * @param instantTime             - Clean action instant time
-   * @param recordsGenerationParams - Parameters for bloom filter record 
generation
+   * @param bloomIndexParallelism   - Parallelism for bloom filter record 
generation
    * @return List of bloom filter index records for the clean metadata
    */
   public static HoodieData<HoodieRecord> 
convertMetadataToBloomFilterRecords(HoodieCleanMetadata cleanMetadata,
                                                                              
HoodieEngineContext engineContext,
                                                                              
String instantTime,
-                                                                             
MetadataRecordsGenerationParams recordsGenerationParams) {
+                                                                             
int bloomIndexParallelism) {
     List<Pair<String, String>> deleteFileList = new ArrayList<>();
     cleanMetadata.getPartitionMetadata().forEach((partition, 
partitionMetadata) -> {
       // Files deleted from a partition
@@ -630,7 +659,7 @@ public class HoodieTableMetadataUtil {
       });
     });
 
-    final int parallelism = Math.max(Math.min(deleteFileList.size(), 
recordsGenerationParams.getBloomIndexParallelism()), 1);
+    final int parallelism = Math.max(Math.min(deleteFileList.size(), 
bloomIndexParallelism), 1);
     HoodieData<Pair<String, String>> deleteFileListRDD = 
engineContext.parallelize(deleteFileList, parallelism);
     return deleteFileListRDD.map(deleteFileInfoPair -> 
HoodieMetadataPayload.createBloomFilterMetadataRecord(
         deleteFileInfoPair.getLeft(), deleteFileInfoPair.getRight(), 
instantTime, StringUtils.EMPTY_STRING,
@@ -640,14 +669,20 @@ public class HoodieTableMetadataUtil {
   /**
    * Convert clean metadata to column stats index records.
    *
-   * @param cleanMetadata           - Clean action metadata
-   * @param engineContext           - Engine context
-   * @param recordsGenerationParams - Parameters for bloom filter record 
generation
+   * @param cleanMetadata                    - Clean action metadata
+   * @param engineContext                    - Engine context
+   * @param dataMetaClient                   - HoodieTableMetaClient for data
+   * @param isColumnStatsIndexEnabled        - Is column stats index enabled
+   * @param columnStatsIndexParallelism      - Parallelism for column stats 
index records generation
+   * @param targetColumnsForColumnStatsIndex - List of columns for column 
stats index
    * @return List of column stats index records for the clean metadata
    */
   public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
                                                                              
HoodieEngineContext engineContext,
-                                                                             
MetadataRecordsGenerationParams recordsGenerationParams) {
+                                                                             
HoodieTableMetaClient dataMetaClient,
+                                                                             
boolean isColumnStatsIndexEnabled,
+                                                                             
int columnStatsIndexParallelism,
+                                                                             
List<String> targetColumnsForColumnStatsIndex) {
     List<Pair<String, String>> deleteFileList = new ArrayList<>();
     cleanMetadata.getPartitionMetadata().forEach((partition, 
partitionMetadata) -> {
       // Files deleted from a partition
@@ -655,25 +690,23 @@ public class HoodieTableMetadataUtil {
       deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition, 
entry)));
     });
 
-    HoodieTableMetaClient dataTableMetaClient = 
recordsGenerationParams.getDataMetaClient();
-
     List<String> columnsToIndex =
-        getColumnsToIndex(recordsGenerationParams,
-            Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
+        getColumnsToIndex(isColumnStatsIndexEnabled, 
targetColumnsForColumnStatsIndex,
+            Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)));
 
     if (columnsToIndex.isEmpty()) {
       // In case there are no columns to index, bail
       return engineContext.emptyHoodieData();
     }
 
-    int parallelism = Math.max(Math.min(deleteFileList.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
+    int parallelism = Math.max(Math.min(deleteFileList.size(), 
columnStatsIndexParallelism), 1);
     return engineContext.parallelize(deleteFileList, parallelism)
         .flatMap(deleteFileInfoPair -> {
           String partitionPath = deleteFileInfoPair.getLeft();
           String filePath = deleteFileInfoPair.getRight();
 
           if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension()) 
|| ExternalFilePathUtil.isExternallyCreatedFile(filePath)) {
-            return getColumnStatsRecords(partitionPath, filePath, 
dataTableMetaClient, columnsToIndex, true).iterator();
+            return getColumnStatsRecords(partitionPath, filePath, 
dataMetaClient, columnsToIndex, true).iterator();
           }
           return Collections.emptyListIterator();
         });
@@ -719,7 +752,7 @@ public class HoodieTableMetadataUtil {
         }
       });
     } catch (IOException e) {
-      throw new HoodieMetadataException("Parsing rollback plan for " + 
rollbackInstant.toString() + " failed ");
+      throw new HoodieMetadataException("Parsing rollback plan for " + 
rollbackInstant + " failed ");
     }
   }
 
@@ -810,8 +843,8 @@ public class HoodieTableMetadataUtil {
       records.add(record);
     });
 
-    LOG.info("Found at " + instantTime + " from " + operation + ". 
#partitions_updated=" + records.size()
-        + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + 
fileChangeCount[1]);
+    LOG.info("Found at {} from {}. #partitions_updated={}, #files_deleted={}, 
#files_appended={}",
+            instantTime, operation, records.size(), fileChangeCount[0], 
fileChangeCount[1]);
 
     return records;
   }
@@ -841,19 +874,21 @@ public class HoodieTableMetadataUtil {
   public static HoodieData<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
                                                                           
Map<String, List<String>> partitionToDeletedFiles,
                                                                           
Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                                                          
MetadataRecordsGenerationParams recordsGenerationParams,
-                                                                          
String instantTime) {
+                                                                          
String instantTime,
+                                                                          
HoodieTableMetaClient dataMetaClient,
+                                                                          int 
bloomIndexParallelism,
+                                                                          
String bloomFilterType) {
     // Create the tuple (partition, filename, isDeleted) to handle both 
deletes and appends
     final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = 
fetchPartitionFileInfoTriplets(partitionToDeletedFiles, 
partitionToAppendedFiles);
 
     // Create records MDT
-    int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), 
recordsGenerationParams.getBloomIndexParallelism()), 1);
+    int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), 
bloomIndexParallelism), 1);
     return engineContext.parallelize(partitionFileFlagTupleList, 
parallelism).flatMap(partitionFileFlagTuple -> {
       final String partitionName = partitionFileFlagTuple.f0;
       final String filename = partitionFileFlagTuple.f1;
       final boolean isDeleted = partitionFileFlagTuple.f2;
       if (!FSUtils.isBaseFile(new Path(filename))) {
-        LOG.warn(String.format("Ignoring file %s as it is not a base file", 
filename));
+        LOG.warn("Ignoring file {} as it is not a base file", filename);
         return Stream.<HoodieRecord>empty().iterator();
       }
 
@@ -861,18 +896,18 @@ public class HoodieTableMetadataUtil {
       ByteBuffer bloomFilterBuffer = ByteBuffer.allocate(0);
       if (!isDeleted) {
         final String pathWithPartition = partitionName + "/" + filename;
-        final Path addedFilePath = new 
Path(recordsGenerationParams.getDataMetaClient().getBasePath(), 
pathWithPartition);
-        bloomFilterBuffer = 
readBloomFilter(recordsGenerationParams.getDataMetaClient().getHadoopConf(), 
addedFilePath);
+        final Path addedFilePath = new Path(dataMetaClient.getBasePathV2(), 
pathWithPartition);
+        bloomFilterBuffer = readBloomFilter(dataMetaClient.getHadoopConf(), 
addedFilePath);
 
         // If reading the bloom filter failed then do not add a record for 
this file
         if (bloomFilterBuffer == null) {
-          LOG.error("Failed to read bloom filter from " + addedFilePath);
+          LOG.error("Failed to read bloom filter from {}", addedFilePath);
           return Stream.<HoodieRecord>empty().iterator();
         }
       }
 
       return 
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
-              partitionName, filename, instantTime, 
recordsGenerationParams.getBloomFilterType(), bloomFilterBuffer, 
partitionFileFlagTuple.f2))
+              partitionName, filename, instantTime, bloomFilterType, 
bloomFilterBuffer, partitionFileFlagTuple.f2))
           .iterator();
     });
   }
@@ -883,35 +918,37 @@ public class HoodieTableMetadataUtil {
   public static HoodieData<HoodieRecord> 
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
                                                                           
Map<String, List<String>> partitionToDeletedFiles,
                                                                           
Map<String, Map<String, Long>> partitionToAppendedFiles,
-                                                                          
MetadataRecordsGenerationParams recordsGenerationParams) {
+                                                                          
HoodieTableMetaClient dataMetaClient,
+                                                                          
boolean isColumnStatsIndexEnabled,
+                                                                          int 
columnStatsIndexParallelism,
+                                                                          
List<String> targetColumnsForColumnStatsIndex) {
     // Find the columns to index
-    HoodieTableMetaClient dataTableMetaClient = 
recordsGenerationParams.getDataMetaClient();
     final List<String> columnsToIndex =
-        getColumnsToIndex(recordsGenerationParams,
-            Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
+        getColumnsToIndex(isColumnStatsIndexEnabled, 
targetColumnsForColumnStatsIndex,
+            Lazy.lazily(() -> tryResolveSchemaForTable(dataMetaClient)));
     if (columnsToIndex.isEmpty()) {
       // In case there are no columns to index, bail
       return engineContext.emptyHoodieData();
     }
 
-    LOG.info(String.format("Indexing %d columns for column stats index", 
columnsToIndex.size()));
+    LOG.info("Indexing {} columns for column stats index", 
columnsToIndex.size());
 
     // Create the tuple (partition, filename, isDeleted) to handle both 
deletes and appends
     final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList = 
fetchPartitionFileInfoTriplets(partitionToDeletedFiles, 
partitionToAppendedFiles);
 
     // Create records MDT
-    int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
+    int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(), 
columnStatsIndexParallelism), 1);
     return engineContext.parallelize(partitionFileFlagTupleList, 
parallelism).flatMap(partitionFileFlagTuple -> {
       final String partitionName = partitionFileFlagTuple.f0;
       final String filename = partitionFileFlagTuple.f1;
       final boolean isDeleted = partitionFileFlagTuple.f2;
       if (!FSUtils.isBaseFile(new Path(filename)) || 
!filename.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-        LOG.warn(String.format("Ignoring file %s as it is not a PARQUET file", 
filename));
+        LOG.warn("Ignoring file {} as it is not a PARQUET file", filename);
         return Stream.<HoodieRecord>empty().iterator();
       }
 
       final String filePathWithPartition = partitionName + "/" + filename;
-      return getColumnStatsRecords(partitionName, filePathWithPartition, 
dataTableMetaClient, columnsToIndex, isDeleted).iterator();
+      return getColumnStatsRecords(partitionName, filePathWithPartition, 
dataMetaClient, columnsToIndex, isDeleted).iterator();
     });
   }
 
@@ -973,7 +1010,7 @@ public class HoodieTableMetadataUtil {
    */
   public static List<FileSlice> getPartitionLatestMergedFileSlices(
       HoodieTableMetaClient metaClient, HoodieTableFileSystemView fsView, 
String partition) {
-    LOG.info("Loading latest merged file slices for metadata table partition " 
+ partition);
+    LOG.info("Loading latest merged file slices for metadata table partition 
{}", partition);
     return getPartitionFileSlices(metaClient, Option.of(fsView), partition, 
true);
   }
 
@@ -988,7 +1025,7 @@ public class HoodieTableMetadataUtil {
    */
   public static List<FileSlice> 
getPartitionLatestFileSlices(HoodieTableMetaClient metaClient,
                                                              
Option<HoodieTableFileSystemView> fsView, String partition) {
-    LOG.info("Loading latest file slices for metadata table partition " + 
partition);
+    LOG.info("Loading latest file slices for metadata table partition {}", 
partition);
     return getPartitionFileSlices(metaClient, fsView, partition, false);
   }
 
@@ -1063,7 +1100,10 @@ public class HoodieTableMetadataUtil {
 
   public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
                                                                              
HoodieEngineContext engineContext,
-                                                                             
MetadataRecordsGenerationParams recordsGenerationParams) {
+                                                                             
HoodieTableMetaClient dataMetaClient,
+                                                                             
boolean isColumnStatsIndexEnabled,
+                                                                             
int columnStatsIndexParallelism,
+                                                                             
List<String> targetColumnsForColumnStatsIndex) {
     List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
         .flatMap(Collection::stream).collect(Collectors.toList());
 
@@ -1079,14 +1119,13 @@ public class HoodieTableMetadataUtil {
                       ? Option.empty()
                       : Option.of(new Schema.Parser().parse(writerSchemaStr)));
 
-      HoodieTableMetaClient dataTableMetaClient = 
recordsGenerationParams.getDataMetaClient();
-      HoodieTableConfig tableConfig = dataTableMetaClient.getTableConfig();
+      HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
 
       // NOTE: Writer schema added to commit metadata will not contain Hudi's 
metadata fields
       Option<Schema> tableSchema = writerSchema.map(schema ->
           tableConfig.populateMetaFields() ? addMetadataFields(schema) : 
schema);
 
-      List<String> columnsToIndex = getColumnsToIndex(recordsGenerationParams,
+      List<String> columnsToIndex = 
getColumnsToIndex(isColumnStatsIndexEnabled, targetColumnsForColumnStatsIndex,
           Lazy.eagerly(tableSchema));
 
       if (columnsToIndex.isEmpty()) {
@@ -1094,10 +1133,10 @@ public class HoodieTableMetadataUtil {
         return engineContext.emptyHoodieData();
       }
 
-      int parallelism = Math.max(Math.min(allWriteStats.size(), 
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
+      int parallelism = Math.max(Math.min(allWriteStats.size(), 
columnStatsIndexParallelism), 1);
       return engineContext.parallelize(allWriteStats, parallelism)
           .flatMap(writeStat ->
-              translateWriteStatToColumnStats(writeStat, dataTableMetaClient, 
columnsToIndex).iterator());
+              translateWriteStatToColumnStats(writeStat, dataMetaClient, 
columnsToIndex).iterator());
     } catch (Exception e) {
       throw new HoodieException("Failed to generate column stats records for 
metadata table", e);
     }
@@ -1106,13 +1145,13 @@ public class HoodieTableMetadataUtil {
   /**
    * Get the list of columns for the table for column stats indexing
    */
-  private static List<String> 
getColumnsToIndex(MetadataRecordsGenerationParams recordsGenParams,
+  private static List<String> getColumnsToIndex(boolean 
isColumnStatsIndexEnabled,
+                                                List<String> 
targetColumnsForColumnStatsIndex,
                                                 Lazy<Option<Schema>> 
lazyWriterSchemaOpt) {
-    checkState(recordsGenParams.isColumnStatsIndexEnabled());
+    checkState(isColumnStatsIndexEnabled);
 
-    List<String> targetColumns = 
recordsGenParams.getTargetColumnsForColumnStatsIndex();
-    if (!targetColumns.isEmpty()) {
-      return targetColumns;
+    if (!targetColumnsForColumnStatsIndex.isEmpty()) {
+      return targetColumnsForColumnStatsIndex;
     }
 
     Option<Schema> writerSchemaOpt = lazyWriterSchemaOpt.get();
@@ -1164,19 +1203,17 @@ public class HoodieTableMetadataUtil {
                                                                                
          List<String> columnsToIndex) {
     try {
       if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-        Path fullFilePath = new Path(datasetMetaClient.getBasePath(), 
filePath);
-        List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
+        Path fullFilePath = new Path(datasetMetaClient.getBasePathV2(), 
filePath);
+        return
             new 
ParquetUtils().readRangeFromParquetMetadata(datasetMetaClient.getHadoopConf(), 
fullFilePath, columnsToIndex);
-
-        return columnRangeMetadataList;
       }
 
-      LOG.warn("Column range index not supported for: " + filePath);
+      LOG.warn("Column range index not supported for: {}", filePath);
       return Collections.emptyList();
     } catch (Exception e) {
       // NOTE: In case reading column range metadata from individual file 
failed,
       //       we simply fall back, in lieu of failing the whole task
-      LOG.error("Failed to fetch column range metadata for: " + filePath);
+      LOG.error("Failed to fetch column range metadata for: {}", filePath);
       return Collections.emptyList();
     }
   }
@@ -1224,13 +1261,13 @@ public class HoodieTableMetadataUtil {
       TableSchemaResolver schemaResolver = new 
TableSchemaResolver(dataTableMetaClient);
       return Option.of(schemaResolver.getTableAvroSchema());
     } catch (Exception e) {
-      throw new HoodieException("Failed to get latest columns for " + 
dataTableMetaClient.getBasePath(), e);
+      throw new HoodieException("Failed to get latest columns for " + 
dataTableMetaClient.getBasePathV2(), e);
     }
   }
 
   /**
    * Given a schema, coerces provided value to instance of {@link 
Comparable<?>} such that
-   * it could subsequently used in column stats
+   * it could subsequently be used in column stats
    *
    * NOTE: This method has to stay compatible with the semantic of
    *      {@link ParquetUtils#readRangeFromParquetMetadata} as they are used 
in tandem
@@ -1330,10 +1367,8 @@ public class HoodieTableMetadataUtil {
     // instant which we have a log block for.
     final String earliestInstantTime = validInstantTimestamps.isEmpty() ? 
SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
     
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream()
-        .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN, earliestInstantTime))
-        .forEach(instant -> {
-          validInstantTimestamps.addAll(getRollbackedCommits(instant, 
datasetTimeline));
-        });
+            .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN, earliestInstantTime))
+            .forEach(instant -> 
validInstantTimestamps.addAll(getRollbackedCommits(instant, datasetTimeline)));
 
     // add restore and rollback instants from MDT.
     
metadataMetaClient.getActiveTimeline().getRollbackAndRestoreTimeline().filterCompletedInstants()
@@ -1401,7 +1436,7 @@ public class HoodieTableMetadataUtil {
               timeline.readRollbackInfoAsBytes(new 
HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION,
                   instant.getTimestamp())).get(), HoodieRollbackPlan.class);
           commitsToRollback = 
Collections.singletonList(rollbackPlan.getInstantToRollback().getCommitTime());
-          LOG.warn("Had to fetch rollback info from requested instant since 
completed file is empty " + instant.toString());
+          LOG.warn("Had to fetch rollback info from requested instant since 
completed file is empty {}", instant);
         }
         return commitsToRollback;
       }
@@ -1411,9 +1446,8 @@ public class HoodieTableMetadataUtil {
         // Restore is made up of several rollbacks
         HoodieRestoreMetadata restoreMetadata = 
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
             timeline.getInstantDetails(instant).get());
-        restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
-          rms.forEach(rm -> rollbackedCommits.addAll(rm.getCommitsRollback()));
-        });
+        restoreMetadata.getHoodieRestoreMetadata().values()
+                .forEach(rms -> rms.forEach(rm -> 
rollbackedCommits.addAll(rm.getCommitsRollback())));
       }
       return rollbackedCommits;
     } catch (IOException e) {
@@ -1447,7 +1481,7 @@ public class HoodieTableMetadataUtil {
 
     if (backup) {
       final Path metadataBackupPath = new Path(metadataTablePath.getParent(), 
".metadata_" + dataMetaClient.createNewInstantTime(false));
-      LOG.info("Backing up metadata directory to " + metadataBackupPath + " 
before deletion");
+      LOG.info("Backing up metadata directory to {} before deletion", 
metadataBackupPath);
       try {
         if (fs.rename(metadataTablePath, metadataBackupPath)) {
           return metadataBackupPath.toString();
@@ -1458,7 +1492,7 @@ public class HoodieTableMetadataUtil {
       }
     }
 
-    LOG.info("Deleting metadata table from " + metadataTablePath);
+    LOG.info("Deleting metadata table from {}", metadataTablePath);
     try {
       fs.delete(metadataTablePath, true);
     } catch (Exception e) {
@@ -1486,7 +1520,7 @@ public class HoodieTableMetadataUtil {
       return deleteMetadataTable(dataMetaClient, context, backup);
     }
 
-    final Path metadataTablePartitionPath = new 
Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePath()),
 partitionPath);
+    final Path metadataTablePartitionPath = new 
Path(HoodieTableMetadata.getMetadataTableBasePath(dataMetaClient.getBasePathV2()),
 partitionPath);
     FileSystem fs = HadoopFSUtils.getFs(metadataTablePartitionPath.toString(), 
context.getHadoopConf().get());
     dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionPath, false);
     try {
@@ -1495,7 +1529,7 @@ public class HoodieTableMetadataUtil {
       }
     } catch (FileNotFoundException e) {
       // Ignoring exception as metadata table already does not exist
-      LOG.debug("Metadata table partition " + partitionPath + " not found at 
path " + metadataTablePartitionPath);
+      LOG.debug("Metadata table partition {} not found at path {}", 
partitionPath, metadataTablePartitionPath);
       return null;
     } catch (Exception e) {
       throw new HoodieMetadataException(String.format("Failed to check 
existence of MDT partition %s at path %s: ", partitionPath, 
metadataTablePartitionPath), e);
@@ -1504,7 +1538,7 @@ public class HoodieTableMetadataUtil {
     if (backup) {
       final Path metadataPartitionBackupPath = new 
Path(metadataTablePartitionPath.getParent().getParent(),
           String.format(".metadata_%s_%s", partitionPath, 
dataMetaClient.createNewInstantTime(false)));
-      LOG.info(String.format("Backing up MDT partition %s to %s before 
deletion", partitionPath, metadataPartitionBackupPath));
+      LOG.info("Backing up MDT partition {} to {} before deletion", 
partitionPath, metadataPartitionBackupPath);
       try {
         if (fs.rename(metadataTablePartitionPath, 
metadataPartitionBackupPath)) {
           return metadataPartitionBackupPath.toString();
@@ -1514,7 +1548,7 @@ public class HoodieTableMetadataUtil {
         LOG.error(String.format("Failed to backup MDT partition %s using 
rename", partitionPath), e);
       }
     } else {
-      LOG.info("Deleting metadata table partition from " + 
metadataTablePartitionPath);
+      LOG.info("Deleting metadata table partition from {}", 
metadataTablePartitionPath);
       try {
         fs.delete(metadataTablePartitionPath, true);
       } catch (Exception e) {
@@ -1621,10 +1655,10 @@ public class HoodieTableMetadataUtil {
       }
     }
 
-    LOG.info(String.format("Estimated file group count for MDT partition %s is 
%d "
-            + "[recordCount=%d, avgRecordSize=%d, minFileGroupCount=%d, 
maxFileGroupCount=%d, growthFactor=%f, "
-            + "maxFileGroupSizeBytes=%d]", partitionType.name(), 
fileGroupCount, recordCount, averageRecordSize, minFileGroupCount,
-        maxFileGroupCount, growthFactor, maxFileGroupSizeBytes));
+    LOG.info("Estimated file group count for MDT partition {} is {} "
+            + "[recordCount={}, avgRecordSize={}, minFileGroupCount={}, 
maxFileGroupCount={}, growthFactor={}, "
+            + "maxFileGroupSizeBytes={}]", partitionType.name(), 
fileGroupCount, recordCount, averageRecordSize, minFileGroupCount,
+        maxFileGroupCount, growthFactor, maxFileGroupSizeBytes);
     return fileGroupCount;
   }
 
@@ -1648,10 +1682,7 @@ public class HoodieTableMetadataUtil {
     }
 
     // Does any enabled partition being enabled need to track the written 
records
-    if (config.enableRecordIndex()) {
-      return true;
-    }
-    return false;
+    return config.enableRecordIndex();
   }
 
   /**
@@ -1768,12 +1799,14 @@ public class HoodieTableMetadataUtil {
             .withReaderSchema(HoodieAvroUtils.getRecordKeySchema())
             
.withLatestInstantTime(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(""))
             .withReverseReader(false)
-            
.withMaxMemorySizeInBytes(configuration.get().getLongBytes(MAX_MEMORY_FOR_COMPACTION.key(),
 DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES))
+            .withMaxMemorySizeInBytes(configuration.get()
+                .getLongBytes(MAX_MEMORY_FOR_COMPACTION.key(), 
DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES))
             
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
             .withPartition(fileSlice.getPartitionPath())
             
.withOptimizedLogBlocksScan(configuration.get().getBoolean("hoodie" + 
HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN, false))
             
.withDiskMapType(configuration.get().getEnum(SPILLABLE_DISK_MAP_TYPE.key(), 
SPILLABLE_DISK_MAP_TYPE.defaultValue()))
-            
.withBitCaskDiskMapCompressionEnabled(configuration.get().getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
 DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
+            .withBitCaskDiskMapCompressionEnabled(configuration.get()
+                .getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), 
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
             .withRecordMerger(HoodieRecordUtils.createRecordMerger(
                 metaClient.getBasePathV2().toString(),
                 engineType,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java
deleted file mode 100644
index 72a8bf4cd26..00000000000
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataRecordsGenerationParams.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.metadata;
-
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Encapsulates all parameters required to generate metadata index for enabled 
index types.
- *
- * @deprecated this component currently duplicates configuration coming from 
the {@code HoodieWriteConfig}
- *             which is problematic; instead we should break this component 
down and use source of truth
- *             for each respective data-point directly ({@code 
HoodieWriteConfig}, {@code HoodieTableMetaClient}, etc)
- */
-@Deprecated
-public class MetadataRecordsGenerationParams implements Serializable {
-
-  private final HoodieTableMetaClient dataMetaClient;
-  private final List<MetadataPartitionType> enabledPartitionTypes;
-  private final String bloomFilterType;
-  private final int bloomIndexParallelism;
-  private final boolean isColumnStatsIndexEnabled;
-  private final int columnStatsIndexParallelism;
-  private final List<String> targetColumnsForColumnStatsIndex;
-  private final List<String> targetColumnsForBloomFilterIndex;
-
-  MetadataRecordsGenerationParams(HoodieTableMetaClient dataMetaClient, 
List<MetadataPartitionType> enabledPartitionTypes, String bloomFilterType, int 
bloomIndexParallelism,
-                                  boolean isColumnStatsIndexEnabled, int 
columnStatsIndexParallelism, List<String> targetColumnsForColumnStatsIndex, 
List<String> targetColumnsForBloomFilterIndex) {
-    this.dataMetaClient = dataMetaClient;
-    this.enabledPartitionTypes = enabledPartitionTypes;
-    this.bloomFilterType = bloomFilterType;
-    this.bloomIndexParallelism = bloomIndexParallelism;
-    this.isColumnStatsIndexEnabled = isColumnStatsIndexEnabled;
-    this.columnStatsIndexParallelism = columnStatsIndexParallelism;
-    this.targetColumnsForColumnStatsIndex = targetColumnsForColumnStatsIndex;
-    this.targetColumnsForBloomFilterIndex = targetColumnsForBloomFilterIndex;
-  }
-
-  public HoodieTableMetaClient getDataMetaClient() {
-    return dataMetaClient;
-  }
-
-  public List<MetadataPartitionType> getEnabledPartitionTypes() {
-    return enabledPartitionTypes;
-  }
-
-  public String getBloomFilterType() {
-    return bloomFilterType;
-  }
-
-  public boolean isColumnStatsIndexEnabled() {
-    return isColumnStatsIndexEnabled;
-  }
-
-  public int getBloomIndexParallelism() {
-    return bloomIndexParallelism;
-  }
-
-  public int getColumnStatsIndexParallelism() {
-    return columnStatsIndexParallelism;
-  }
-
-  public List<String> getTargetColumnsForColumnStatsIndex() {
-    return targetColumnsForColumnStatsIndex;
-  }
-
-  public List<String> getSecondaryKeysForBloomFilterIndex() {
-    return targetColumnsForBloomFilterIndex;
-  }
-}

Reply via email to