nsivabalan commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1190657749


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieMetadataBulkInsertPartitioner.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+
+import scala.Tuple2;
+
+/**
+ * A {@code BulkInsertPartitioner} implementation for Metadata Table to 
improve performance of initialization of metadata
+ * table partition when a very large number of records are inserted.
+ *
+ * This partitioner requires the records to be already tagged with location.
+ */
+public class SparkHoodieMetadataBulkInsertPartitioner implements 
BulkInsertPartitioner<JavaRDD<HoodieRecord>> {
+  final int numPartitions;
+  public SparkHoodieMetadataBulkInsertPartitioner(int numPartitions) {
+    this.numPartitions = numPartitions;
+  }
+
+  private class FileGroupPartitioner extends Partitioner {
+
+    @Override
+    public int getPartition(Object key) {
+      return ((Tuple2<Integer, String>)key)._1;
+    }
+
+    @Override
+    public int numPartitions() {
+      return numPartitions;
+    }
+  }
+
+  // FileIDs for the various partitions
+  private List<String> fileIDPfxs;
+
+  /**
+   * Partition the records by their location. The number of partitions is 
determined by the number of MDT fileGroups being udpated rather than the
+   * specific value of outputSparkPartitions.
+   */
+  @Override
+  public JavaRDD<HoodieRecord> repartitionRecords(JavaRDD<HoodieRecord> 
records, int outputSparkPartitions) {
+    Comparator<Tuple2<Integer, String>> keyComparator =
+            (Comparator<Tuple2<Integer, String>> & Serializable)(t1, t2) -> 
t1._2.compareTo(t2._2);
+
+    // Partition the records by their file group
+    JavaRDD<HoodieRecord> partitionedRDD = records
+            // key by <file group index, record key>. The file group index is 
used to partition and the record key is used to sort within the partition.
+            .keyBy(r -> {
+              int fileGroupIndex = 
HoodieTableMetadataUtil.getFileGroupIndexFromFileId(r.getCurrentLocation().getFileId());
+              return new Tuple2<>(fileGroupIndex, r.getRecordKey());
+            })
+            .repartitionAndSortWithinPartitions(new FileGroupPartitioner(), 
keyComparator)
+            .map(t -> t._2);
+
+    fileIDPfxs = partitionedRDD.mapPartitions(recordItr -> {
+      // Due to partitioning, all record in the partition should have same 
fileID. So we only can get the fileID prefix from the first record.
+      List<String> fileIds = new ArrayList<>(1);
+      if (recordItr.hasNext()) {
+        HoodieRecord record = recordItr.next();
+        final String fileID = 
HoodieTableMetadataUtil.getFileGroupPrefix(record.getCurrentLocation().getFileId());
+        fileIds.add(fileID);
+      } else {
+        // Empty partition
+        fileIds.add("");

Review Comment:
   can you help me understand when we might hit this ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -670,17 +670,75 @@ private Long getTableChecksum() {
     return getLong(TABLE_CHECKSUM);
   }
 
-  public List<String> getMetadataPartitionsInflight() {
-    return StringUtils.split(
-        getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, 
StringUtils.EMPTY_STRING),
-        CONFIG_VALUES_DELIMITER
-    );
+  public Set<String> getMetadataPartitionsInflight() {
+    return new HashSet<>(StringUtils.split(
+            getStringOrDefault(TABLE_METADATA_PARTITIONS_INFLIGHT, 
StringUtils.EMPTY_STRING),
+            CONFIG_VALUES_DELIMITER));
   }
 
   public Set<String> getMetadataPartitions() {
     return new HashSet<>(
-        StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, 
StringUtils.EMPTY_STRING),
-            CONFIG_VALUES_DELIMITER));
+            StringUtils.split(getStringOrDefault(TABLE_METADATA_PARTITIONS, 
StringUtils.EMPTY_STRING),
+                    CONFIG_VALUES_DELIMITER));
+  }
+
+  /**
+   * @returns true if metadata table has been created and is being used for 
this dataset, else returns false.
+   */
+  public boolean isMetadataTableEnabled() {
+    return isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+  }
+
+  /**
+   * Checks if metadata table is enabled and the specified partition has been 
initialized.
+   *
+   * @param partition The partition to check
+   * @returns true if the specific partition has been initialized, else 
returns false.
+   */
+  public boolean isMetadataPartitionEnabled(MetadataPartitionType partition) {
+    return getMetadataPartitions().contains(partition.getPartitionPath());
+  }
+
+  /**
+   * Enables or disables the specified metadata table partition.
+   *
+   * @param partition The partition
+   * @param enabled   If true, the partition is enabled, else disabled
+   */
+  public void setMetadataPartitionState(MetadataPartitionType partition, 
boolean enabled) {
+    
ValidationUtils.checkArgument(!partition.getPartitionPath().contains(CONFIG_VALUES_DELIMITER),
+            "Metadata Table partition path cannot contain a comma: " + 
partition.getPartitionPath());
+    Set<String> partitions = getMetadataPartitions();
+    Set<String> partitionsInflight = getMetadataPartitionsInflight();
+    if (enabled) {
+      partitions.add(partition.getPartitionPath());
+      partitionsInflight.remove(partition.getPartitionPath());
+    } else if (partition.equals(MetadataPartitionType.FILES)) {
+      // file listing partition is required for all other partitions to work
+      // Disabling file partition will also disable all partitions
+      partitions.clear();
+      partitionsInflight.clear();
+    } else {
+      partitions.remove(partition.getPartitionPath());
+      partitionsInflight.remove(partition.getPartitionPath());
+    }
+    setValue(TABLE_METADATA_PARTITIONS, 
partitions.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
+    setValue(TABLE_METADATA_PARTITIONS_INFLIGHT, 
partitionsInflight.stream().sorted().collect(Collectors.joining(CONFIG_VALUES_DELIMITER)));
+  }
+
+  /**
+   * Enables the specified metadata table partition as inflight.
+   *
+   * @param partition The list of partitions to enable as inflight.
+   */
+  public void setMetadataPartitionsAsInflight(List<MetadataPartitionType> 
partitionTypes) {
+    Set<String> partitions = getMetadataPartitionsInflight();

Review Comment:
   minor. can we rename to "partitionsInflight"



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -562,53 +532,144 @@ private <T extends SpecificRecordBase> boolean 
isCommitRevertedByInFlightAction(
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
-   * @param dataMetaClient           - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime       - Timestamp to use for the commit
+   * @param partitionsToInit         - List of MDT partitions to initialize
    * @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
    */
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
                                            Option<String> 
inflightInstantTimestamp) throws IOException {
     if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
       return false;
     }
 
-    String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-    initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-    initTableMetadata();
-    // if async metadata indexing is enabled,
-    // then only initialize files partition as other partitions will be built 
using HoodieIndexer
-    List<MetadataPartitionType> enabledPartitionTypes =  new ArrayList<>();
-    if (dataWriteConfig.isMetadataAsyncIndex()) {
-      enabledPartitionTypes.add(MetadataPartitionType.FILES);
-    } else {
-      // all enabled ones should be initialized
-      enabledPartitionTypes = this.enabledPartitionTypes;
+    // FILES partition is always initialized first
+    
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+            || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+    metadataMetaClient = initializeMetaClient();
+
+    // Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+    List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+    Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
+            .map(p -> {
+              String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+              return Pair.of(partitionName, p.getFileNameToSizeMap());
+            })
+            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+    for (MetadataPartitionType partitionType : partitionsToInit) {
+      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
+      String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
+
+      LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+
+      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+      switch (partitionType) {
+        case FILES:
+          fileGroupCountAndRecordsPair = 
initializeFilesPartition(initializationTime, partitionInfoList);
+          break;
+        case BLOOM_FILTERS:
+          fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+          break;
+        case COLUMN_STATS:
+          fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+          break;
+        default:
+          throw new HoodieMetadataException("Unsupported MDT partition type: " 
+ partitionType);
+      }
+
+      // Generate the file groups
+      final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+      initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);

Review Comment:
   so do we create an empty delete block followed by a bulk insert w/ same 
fileId? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -892,25 +917,24 @@ public void buildMetadataPartitions(HoodieEngineContext 
engineContext, List<Hood
       }
       partitionTypes.add(partitionType);
     });
-    // before initial commit update inflight indexes in table config
-    Set<String> inflightIndexes = 
getInflightMetadataPartitions(dataMetaClient.getTableConfig());
-    
inflightIndexes.addAll(indexPartitionInfos.stream().map(HoodieIndexPartitionInfo::getMetadataPartitionPath).collect(Collectors.toSet()));
-    
dataMetaClient.getTableConfig().setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT.key(),
 String.join(",", inflightIndexes));
-    HoodieTableConfig.update(dataMetaClient.getFs(), new 
Path(dataMetaClient.getMetaPath()), dataMetaClient.getTableConfig().getProps());
-    initialCommit(indexUptoInstantTime + METADATA_INDEXER_TIME_SUFFIX, 
partitionTypes);
+
+    // before initialization set these  partitions as inflight in table config
+    HoodieTableMetadataUtil.setMetadataInflightPartitions(dataMetaClient, 
partitionTypes);

Review Comment:
   wrt 2nd flight. we should be cautious in updating table config on the fly. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -373,105 +356,92 @@ public List<MetadataPartitionType> 
getEnabledPartitionTypes() {
     return this.enabledPartitionTypes;
   }
 
-  /**
-   * Initialize the metadata table if it does not exist.
-   * <p>
-   * If the metadata table does not exist, then file and partition listing is 
used to initialize the table.
-   *
-   * @param engineContext
-   * @param actionMetadata           Action metadata types extending Avro 
generated SpecificRecordBase
-   * @param inflightInstantTimestamp Timestamp of an instant in progress on 
the dataset. This instant is ignored
-   *                                 while deciding to initialize the metadata 
table.
-   */
-  protected abstract <T extends SpecificRecordBase> void 
initialize(HoodieEngineContext engineContext,
-                                                                    Option<T> 
actionMetadata,
-                                                                    
Option<String> inflightInstantTimestamp);
-
-  public void initTableMetadata() {
-    try {
-      if (this.metadata != null) {
-        this.metadata.close();
-      }
-      this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(),
-          dataWriteConfig.getBasePath(), 
dataWriteConfig.getSpillableMapBasePath());
-      this.metadataMetaClient = metadata.getMetadataMetaClient();
-    } catch (Exception e) {
-      throw new HoodieException("Error initializing metadata table for reads", 
e);
-    }
-  }
-
   /**
    * Initialize the metadata table if needed.
    *
    * @param dataMetaClient           - meta client for the data table
    * @param actionMetadata           - optional action metadata
    * @param inflightInstantTimestamp - timestamp of an instant in progress on 
the dataset
    * @param <T>                      - action metadata types extending Avro 
generated SpecificRecordBase
-   * @throws IOException
+   * @throws IOException on errors
    */
-  protected <T extends SpecificRecordBase> void 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
-                                                                   Option<T> 
actionMetadata,
-                                                                   
Option<String> inflightInstantTimestamp) throws IOException {
+  protected <T extends SpecificRecordBase> boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                                                      
Option<T> actionMetadata,
+                                                                      
Option<String> inflightInstantTimestamp) throws IOException {
     HoodieTimer timer = HoodieTimer.start();
+    List<MetadataPartitionType> partitionsToInit = new 
ArrayList<>(MetadataPartitionType.values().length);
 
-    boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+    try {
+      boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+      if (!exists) {
+        // FILES partition is always required
+        partitionsToInit.add(MetadataPartitionType.FILES);
+      }
 
-    if (!exists) {
-      // Initialize for the first time by listing partitions and files 
directly from the file system
-      if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      // check if any of the enabled partition types needs to be initialized
+      // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
+      if (!dataWriteConfig.isMetadataAsyncIndex()) {

Review Comment:
   not in this patch, but in future. we might need to design this better. 
   this assumes that every partition part from FILES will have to go via async 
indexer. But we should let user decide which one can go via inline and which 
ones need to go via async index building. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -33,10 +33,6 @@ public enum MetadataPartitionType {
   private final String partitionPath;
   // FileId prefix used for all file groups in this partition.
   private final String fileIdPrefix;
-  // Total file groups
-  // TODO fix: enum should not have any mutable aspect as this compromises 
whole idea
-  //      of the enum being static, immutable entity
-  private int fileGroupCount = 1;

Review Comment:
   we don't serialize this enum as is anywhere right? if yes, this is backwards 
incompatible change. Just wanted to double check.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/ScheduleIndexActionExecutor.java:
##########
@@ -100,15 +99,6 @@ public Option<HoodieIndexPlan> execute() {
       // get last completed instant
       Option<HoodieInstant> indexUptoInstant = 
table.getActiveTimeline().getContiguousCompletedWriteTimeline().lastInstant();
       if (indexUptoInstant.isPresent()) {
-        // start initializing file groups
-        // in case FILES partition itself was not initialized before (i.e. 
metadata was never enabled), this will initialize synchronously
-        HoodieTableMetadataWriter metadataWriter = 
table.getMetadataWriter(instantTime)
-            .orElseThrow(() -> new HoodieIndexException(String.format("Could 
not get metadata writer to initialize filegroups for indexing for instant: %s", 
instantTime)));
-        if 
(!finalPartitionsToIndex.get(0).getPartitionPath().equals(MetadataPartitionType.FILES.getPartitionPath()))
 {
-          // initialize metadata partition only if not for FILES partition.
-          metadataWriter.initializeMetadataPartitions(table.getMetaClient(), 
finalPartitionsToIndex, indexUptoInstant.get().getTimestamp());

Review Comment:
   if not here, where does the initialization happen for the file groups of 
interest? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -562,53 +532,144 @@ private <T extends SpecificRecordBase> boolean 
isCommitRevertedByInFlightAction(
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
-   * @param dataMetaClient           - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime       - Timestamp to use for the commit
+   * @param partitionsToInit         - List of MDT partitions to initialize
    * @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
    */
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
                                            Option<String> 
inflightInstantTimestamp) throws IOException {
     if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
       return false;
     }
 
-    String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-    initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-    initTableMetadata();
-    // if async metadata indexing is enabled,
-    // then only initialize files partition as other partitions will be built 
using HoodieIndexer
-    List<MetadataPartitionType> enabledPartitionTypes =  new ArrayList<>();
-    if (dataWriteConfig.isMetadataAsyncIndex()) {
-      enabledPartitionTypes.add(MetadataPartitionType.FILES);
-    } else {
-      // all enabled ones should be initialized
-      enabledPartitionTypes = this.enabledPartitionTypes;
+    // FILES partition is always initialized first
+    
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+            || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+    metadataMetaClient = initializeMetaClient();
+
+    // Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+    List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+    Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
+            .map(p -> {
+              String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+              return Pair.of(partitionName, p.getFileNameToSizeMap());
+            })
+            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+    for (MetadataPartitionType partitionType : partitionsToInit) {
+      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
+      String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
+
+      LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+
+      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+      switch (partitionType) {
+        case FILES:
+          fileGroupCountAndRecordsPair = 
initializeFilesPartition(initializationTime, partitionInfoList);
+          break;
+        case BLOOM_FILTERS:
+          fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+          break;
+        case COLUMN_STATS:
+          fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+          break;
+        default:
+          throw new HoodieMetadataException("Unsupported MDT partition type: " 
+ partitionType);
+      }
+
+      // Generate the file groups
+      final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+      initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
+
+      // Perform the commit using bulkCommit
+      HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
+      bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
+      metadataMetaClient.reloadActiveTimeline();
+      dataMetaClient = 
HoodieTableMetadataUtil.setMetadataPartitionState(dataMetaClient, 
partitionType, true);
     }
-    initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
enabledPartitionTypes);
-    initialCommit(createInstantTime, enabledPartitionTypes);
-    updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
     return true;
   }
 
-  private String getInitialCommitInstantTime(HoodieTableMetaClient 
dataMetaClient) {
-    // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
-    // Otherwise, we use the timestamp of the latest completed action.
-    String createInstantTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-        
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-    LOG.info("Creating a new metadata table in " + 
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
-    return createInstantTime;
+  /**
+   * Returns a unique timestamp to use for initializing a MDT partition.
+   * <p>
+   * Since commits are immutable, we should use unique timestamps to 
initialize each partition. For this, we will add a suffix to the given 
initializationTime
+   * until we find a unique timestamp.
+   *
+   * @param initializationTime Timestamp from dataset to use for initialization
+   * @return a unique timestamp for MDT
+   */
+  private String generateUniqueCommitInstantTime(String initializationTime) {
+    for (int offset = 0; ; ++offset) {

Review Comment:
   whats the purpose of offset. 
   I thought we only initialize one partition from MDT at a time. so, can you 
help me understand the necessity here.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -562,53 +532,144 @@ private <T extends SpecificRecordBase> boolean 
isCommitRevertedByInFlightAction(
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
-   * @param dataMetaClient           - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime       - Timestamp to use for the commit
+   * @param partitionsToInit         - List of MDT partitions to initialize
    * @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
    */
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
                                            Option<String> 
inflightInstantTimestamp) throws IOException {
     if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
       return false;
     }
 
-    String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-    initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-    initTableMetadata();
-    // if async metadata indexing is enabled,
-    // then only initialize files partition as other partitions will be built 
using HoodieIndexer
-    List<MetadataPartitionType> enabledPartitionTypes =  new ArrayList<>();
-    if (dataWriteConfig.isMetadataAsyncIndex()) {
-      enabledPartitionTypes.add(MetadataPartitionType.FILES);
-    } else {
-      // all enabled ones should be initialized
-      enabledPartitionTypes = this.enabledPartitionTypes;
+    // FILES partition is always initialized first
+    
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+            || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+    metadataMetaClient = initializeMetaClient();
+
+    // Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+    List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+    Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
+            .map(p -> {
+              String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+              return Pair.of(partitionName, p.getFileNameToSizeMap());
+            })
+            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+    for (MetadataPartitionType partitionType : partitionsToInit) {
+      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
+      String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
+
+      LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+
+      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+      switch (partitionType) {
+        case FILES:
+          fileGroupCountAndRecordsPair = 
initializeFilesPartition(initializationTime, partitionInfoList);
+          break;
+        case BLOOM_FILTERS:
+          fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+          break;
+        case COLUMN_STATS:
+          fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+          break;
+        default:
+          throw new HoodieMetadataException("Unsupported MDT partition type: " 
+ partitionType);
+      }
+
+      // Generate the file groups
+      final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+      initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
+
+      // Perform the commit using bulkCommit
+      HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
+      bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
+      metadataMetaClient.reloadActiveTimeline();
+      dataMetaClient = 
HoodieTableMetadataUtil.setMetadataPartitionState(dataMetaClient, 
partitionType, true);
     }
-    initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
enabledPartitionTypes);
-    initialCommit(createInstantTime, enabledPartitionTypes);
-    updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
     return true;
   }
 
-  private String getInitialCommitInstantTime(HoodieTableMetaClient 
dataMetaClient) {
-    // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
-    // Otherwise, we use the timestamp of the latest completed action.
-    String createInstantTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-        
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-    LOG.info("Creating a new metadata table in " + 
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
-    return createInstantTime;
+  /**
+   * Returns a unique timestamp to use for initializing a MDT partition.
+   * <p>
+   * Since commits are immutable, we should use unique timestamps to 
initialize each partition. For this, we will add a suffix to the given 
initializationTime
+   * until we find a unique timestamp.
+   *
+   * @param initializationTime Timestamp from dataset to use for initialization
+   * @return a unique timestamp for MDT
+   */
+  private String generateUniqueCommitInstantTime(String initializationTime) {
+    for (int offset = 0; ; ++offset) {
+      final String commitInstantTime = 
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+      if 
(!metadataMetaClient.getCommitTimeline().containsInstant(commitInstantTime)) {
+        return commitInstantTime;
+      }
+    }
   }
 
-  private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, 
Option<String> inflightInstantTimestamp) {
-    ValidationUtils.checkState(enabled, "Metadata table cannot be initialized 
as it is not enabled");
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionToFilesMap) {

Review Comment:
   do we have tests for this? 
   i.e. even if we enable 3 partitions, each commit will only initialize just 
1. and in subsequent one initialize the next and so on. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to