nsivabalan commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1192725066
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -562,53 +532,144 @@ private <T extends SpecificRecordBase> boolean isCommitRevertedByInFlightAction( /** * Initialize the Metadata Table by listing files and partitions from the file system. * - * @param dataMetaClient - {@code HoodieTableMetaClient} for the dataset. + * @param initializationTime - Timestamp to use for the commit + * @param partitionsToInit - List of MDT partitions to initialize * @param inflightInstantTimestamp - Current action instant responsible for this initialization */ - private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, + private boolean initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit, Option<String> inflightInstantTimestamp) throws IOException { if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { return false; } - String createInstantTime = getInitialCommitInstantTime(dataMetaClient); - - initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS); - initTableMetadata(); - // if async metadata indexing is enabled, - // then only initialize files partition as other partitions will be built using HoodieIndexer - List<MetadataPartitionType> enabledPartitionTypes = new ArrayList<>(); - if (dataWriteConfig.isMetadataAsyncIndex()) { - enabledPartitionTypes.add(MetadataPartitionType.FILES); - } else { - // all enabled ones should be initialized - enabledPartitionTypes = this.enabledPartitionTypes; + // FILES partition is always initialized first + ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES) + || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), "FILES partition should be initialized first: " + partitionsToInit); + + metadataMetaClient = initializeMetaClient(); + + // Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT + boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES); + List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? listAllPartitionsFromMDT(initializationTime) : listAllPartitionsFromFilesystem(initializationTime); + Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream() + .map(p -> { + String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); + return Pair.of(partitionName, p.getFileNameToSizeMap()); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + for (MetadataPartitionType partitionType : partitionsToInit) { + // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. + String commitTimeForPartition = generateUniqueCommitInstantTime(initializationTime); + + LOG.info("Initializing MDT partition " + partitionType + " at instant " + commitTimeForPartition); + + Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair; + switch (partitionType) { + case FILES: + fileGroupCountAndRecordsPair = initializeFilesPartition(initializationTime, partitionInfoList); + break; + case BLOOM_FILTERS: + fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(initializationTime, partitionToFilesMap); + break; + case COLUMN_STATS: + fileGroupCountAndRecordsPair = initializeColumnStatsPartition(partitionToFilesMap); + break; + default: + throw new HoodieMetadataException("Unsupported MDT partition type: " + partitionType); + } + + // Generate the file groups + final int fileGroupCount = fileGroupCountAndRecordsPair.getKey(); + ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionType + " should be > 0"); + initializeFileGroups(dataMetaClient, partitionType, commitTimeForPartition, fileGroupCount); + + // Perform the commit using bulkCommit + HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue(); + bulkCommit(commitTimeForPartition, partitionType, records, fileGroupCount); + metadataMetaClient.reloadActiveTimeline(); + dataMetaClient = HoodieTableMetadataUtil.setMetadataPartitionState(dataMetaClient, partitionType, true); } - initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes); - initialCommit(createInstantTime, enabledPartitionTypes); - updateInitializedPartitionsInTableConfig(enabledPartitionTypes); + return true; } - private String getInitialCommitInstantTime(HoodieTableMetaClient dataMetaClient) { - // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit - // Otherwise, we use the timestamp of the latest completed action. - String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants() - .getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); - LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime); - return createInstantTime; + /** + * Returns a unique timestamp to use for initializing a MDT partition. + * <p> + * Since commits are immutable, we should use unique timestamps to initialize each partition. For this, we will add a suffix to the given initializationTime + * until we find a unique timestamp. + * + * @param initializationTime Timestamp from dataset to use for initialization + * @return a unique timestamp for MDT + */ + private String generateUniqueCommitInstantTime(String initializationTime) { + for (int offset = 0; ; ++offset) { Review Comment: if we can add docs for this, would be nice. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -562,53 +532,144 @@ private <T extends SpecificRecordBase> boolean isCommitRevertedByInFlightAction( /** * Initialize the Metadata Table by listing files and partitions from the file system. * - * @param dataMetaClient - {@code HoodieTableMetaClient} for the dataset. + * @param initializationTime - Timestamp to use for the commit + * @param partitionsToInit - List of MDT partitions to initialize * @param inflightInstantTimestamp - Current action instant responsible for this initialization */ - private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, + private boolean initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit, Option<String> inflightInstantTimestamp) throws IOException { if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { return false; } - String createInstantTime = getInitialCommitInstantTime(dataMetaClient); - - initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS); - initTableMetadata(); - // if async metadata indexing is enabled, - // then only initialize files partition as other partitions will be built using HoodieIndexer - List<MetadataPartitionType> enabledPartitionTypes = new ArrayList<>(); - if (dataWriteConfig.isMetadataAsyncIndex()) { - enabledPartitionTypes.add(MetadataPartitionType.FILES); - } else { - // all enabled ones should be initialized - enabledPartitionTypes = this.enabledPartitionTypes; + // FILES partition is always initialized first + ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES) + || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), "FILES partition should be initialized first: " + partitionsToInit); + + metadataMetaClient = initializeMetaClient(); + + // Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT + boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES); + List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? listAllPartitionsFromMDT(initializationTime) : listAllPartitionsFromFilesystem(initializationTime); + Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream() + .map(p -> { + String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); + return Pair.of(partitionName, p.getFileNameToSizeMap()); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + for (MetadataPartitionType partitionType : partitionsToInit) { + // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. + String commitTimeForPartition = generateUniqueCommitInstantTime(initializationTime); + + LOG.info("Initializing MDT partition " + partitionType + " at instant " + commitTimeForPartition); + + Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair; + switch (partitionType) { + case FILES: + fileGroupCountAndRecordsPair = initializeFilesPartition(initializationTime, partitionInfoList); + break; + case BLOOM_FILTERS: + fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(initializationTime, partitionToFilesMap); + break; + case COLUMN_STATS: + fileGroupCountAndRecordsPair = initializeColumnStatsPartition(partitionToFilesMap); + break; + default: + throw new HoodieMetadataException("Unsupported MDT partition type: " + partitionType); + } + + // Generate the file groups + final int fileGroupCount = fileGroupCountAndRecordsPair.getKey(); + ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionType + " should be > 0"); + initializeFileGroups(dataMetaClient, partitionType, commitTimeForPartition, fileGroupCount); Review Comment: thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org