codope commented on code in PR #8758:
URL: https://github.com/apache/hudi/pull/8758#discussion_r1231893906


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean 
isCommitRevertedByInFlightAction(
   /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
-   * @param dataMetaClient           - {@code HoodieTableMetaClient} for the 
dataset.
+   * @param initializationTime       - Timestamp to use for the commit
+   * @param partitionsToInit         - List of MDT partitions to initialize
    * @param inflightInstantTimestamp - Current action instant responsible for 
this initialization
    */
-  private boolean initializeFromFilesystem(HoodieTableMetaClient 
dataMetaClient,
+  private boolean initializeFromFilesystem(String initializationTime, 
List<MetadataPartitionType> partitionsToInit,
                                            Option<String> 
inflightInstantTimestamp) throws IOException {
     if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) {
       return false;
     }
 
-    String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-
-    initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
-    initTableMetadata();
-    // if async metadata indexing is enabled,
-    // then only initialize files partition as other partitions will be built 
using HoodieIndexer
-    List<MetadataPartitionType> enabledPartitionTypes =  new ArrayList<>();
-    if (dataWriteConfig.isMetadataAsyncIndex()) {
-      enabledPartitionTypes.add(MetadataPartitionType.FILES);
-    } else {
-      // all enabled ones should be initialized
-      enabledPartitionTypes = this.enabledPartitionTypes;
+    // FILES partition is always initialized first
+    
ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES)
+            || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), 
"FILES partition should be initialized first: " + partitionsToInit);
+
+    metadataMetaClient = initializeMetaClient();
+
+    // Get a complete list of files and partitions from the file system or 
from already initialized FILES partition of MDT
+    boolean filesPartitionAvailable = 
dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES);
+    List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? 
listAllPartitionsFromMDT(initializationTime) : 
listAllPartitionsFromFilesystem(initializationTime);
+    Map<String, Map<String, Long>> partitionToFilesMap = 
partitionInfoList.stream()
+            .map(p -> {
+              String partitionName = 
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+              return Pair.of(partitionName, p.getFileNameToSizeMap());
+            })
+            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+    for (MetadataPartitionType partitionType : partitionsToInit) {
+      // Find the commit timestamp to use for this partition. Each 
initialization should use its own unique commit time.
+      String commitTimeForPartition = 
generateUniqueCommitInstantTime(initializationTime);
+
+      LOG.info("Initializing MDT partition " + partitionType + " at instant " 
+ commitTimeForPartition);
+
+      Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
+      switch (partitionType) {
+        case FILES:
+          fileGroupCountAndRecordsPair = 
initializeFilesPartition(initializationTime, partitionInfoList);
+          break;
+        case BLOOM_FILTERS:
+          fileGroupCountAndRecordsPair = 
initializeBloomFiltersPartition(initializationTime, partitionToFilesMap);
+          break;
+        case COLUMN_STATS:
+          fileGroupCountAndRecordsPair = 
initializeColumnStatsPartition(partitionToFilesMap);
+          break;
+        case RECORD_INDEX:
+          fileGroupCountAndRecordsPair = initializeRecordIndexPartition();
+          break;
+        default:
+          throw new HoodieMetadataException("Unsupported MDT partition type: " 
+ partitionType);
+      }
+
+      // Generate the file groups
+      final int fileGroupCount = fileGroupCountAndRecordsPair.getKey();
+      ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for 
MDT partition " + partitionType + " should be > 0");
+      initializeFileGroups(dataMetaClient, partitionType, 
commitTimeForPartition, fileGroupCount);
+
+      // Perform the commit using bulkCommit
+      HoodieData<HoodieRecord> records = 
fileGroupCountAndRecordsPair.getValue();
+      bulkCommit(commitTimeForPartition, partitionType, records, 
fileGroupCount);
+      metadataMetaClient.reloadActiveTimeline();
+      
dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, 
partitionType, true);
+      initMetadataReader();
     }
-    initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
enabledPartitionTypes);
-    initialCommit(createInstantTime, enabledPartitionTypes);
-    updateInitializedPartitionsInTableConfig(enabledPartitionTypes);
+
     return true;
   }
 
-  private String getInitialCommitInstantTime(HoodieTableMetaClient 
dataMetaClient) {
-    // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
-    // Otherwise, we use the timestamp of the latest completed action.
-    String createInstantTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants()
-        
.getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-    LOG.info("Creating a new metadata table in " + 
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
-    return createInstantTime;
+  /**
+   * Returns a unique timestamp to use for initializing a MDT partition.
+   * <p>
+   * Since commits are immutable, we should use unique timestamps to 
initialize each partition. For this, we will add a suffix to the given 
initializationTime
+   * until we find a unique timestamp.
+   *
+   * @param initializationTime Timestamp from dataset to use for initialization
+   * @return a unique timestamp for MDT
+   */
+  private String generateUniqueCommitInstantTime(String initializationTime) {
+    // Add suffix to initializationTime to find an unused instant time for the 
next index initialization.
+    // This function would be called multiple times in a single application if 
multiple indexes are being
+    // initialized one after the other.
+    for (int offset = 0; ; ++offset) {
+      final String commitInstantTime = 
HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset);
+      if 
(!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) {
+        return commitInstantTime;
+      }
+    }
   }
 
-  private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, 
Option<String> inflightInstantTimestamp) {
-    ValidationUtils.checkState(enabled, "Metadata table cannot be initialized 
as it is not enabled");
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeColumnStatsPartition(Map<String, Map<String, Long>> 
partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+            engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams());
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeBloomFiltersPartition(String createInstantTime, Map<String, 
Map<String, Long>> partitionToFilesMap) {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+        engineContext, Collections.emptyMap(), partitionToFilesMap, 
getRecordsGenerationParams(), createInstantTime);
+
+    final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+    return Pair.of(fileGroupCount, records);
+  }
+
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition() throws IOException {
+    // Open the MDT reader to create a file system view
+    initMetadataReader();
+    final HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(dataMetaClient,
+        dataMetaClient.getActiveTimeline(), metadata);
+
+    // Collect the list of latest base files present in each partition
+    List<String> partitions = metadata.getAllPartitionPaths();
+    final List<Pair<String, String>> partitionBaseFilePairs = new 
ArrayList<>();
+    for (String partition : partitions) {
+      partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
+          .map(basefile -> Pair.of(partition, 
basefile.getFileName())).collect(Collectors.toList()));
+    }
 
+    LOG.info("Initializing record index from " + partitionBaseFilePairs.size() 
+ " base files in "
+        + partitions.size() + " partitions");
+
+    // Collect record keys from the files in parallel
+    HoodieData<HoodieRecord> records = 
readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs);
+    records.persist("MEMORY_AND_DISK_SER");
+    final long recordCount = records.count();
+
+    // Initialize the file groups
+    final int fileGroupCount = 
HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX,
 recordCount,
+        RECORD_INDEX_AVERAGE_RECORD_SIZE, 
dataWriteConfig.getRecordIndexMinFileGroupCount(),
+        dataWriteConfig.getRecordIndexMaxFileGroupCount(), 
dataWriteConfig.getRecordIndexGrowthFactor(),
+        dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
+
+    LOG.info(String.format("Initializing record index with %d mappings and %d 
file groups.", recordCount, fileGroupCount));
+    return Pair.of(fileGroupCount, records);
+  }
+
+  /**
+   * Read the record keys from base files in partitions and return records.
+   */
+  private HoodieData<HoodieRecord> 
readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,
+      List<Pair<String, String>> partitionBaseFilePairs) {
+    if (partitionBaseFilePairs.isEmpty()) {
+      return engineContext.emptyHoodieData();
+    }
+
+    engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index: 
reading record keys from base files");
+    return engineContext.parallelize(partitionBaseFilePairs, 
partitionBaseFilePairs.size()).flatMap(p -> {
+      final String partition = p.getKey();
+      final String filename = p.getValue();
+      Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition + 
Path.SEPARATOR + filename);
+
+      final String fileId = FSUtils.getFileId(filename);
+      final String instantTime = FSUtils.getCommitTime(filename);
+      HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hadoopConf.get(),
 dataFilePath);

Review Comment:
   > we are returning an iterator with reference to the reader
   
   How so? Why do we even need the reader later?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to