codope commented on code in PR #8758: URL: https://github.com/apache/hudi/pull/8758#discussion_r1213269335
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java: ########## @@ -99,7 +90,25 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { * Deletes the given metadata partitions. This path reuses DELETE_PARTITION operation. * * @param instantTime - instant time when replacecommit corresponding to the drop will be recorded in the metadata timeline - * @param partitions - list of {@link MetadataPartitionType} to drop + * @param partitions - list of {@link MetadataPartitionType} to drop */ void deletePartitions(String instantTime, List<MetadataPartitionType> partitions); + + /** + * It returns write client for metadata table. + */ + BaseHoodieWriteClient getWriteClient(); Review Comment: rename to `getMetadataTableWriteClient` for clarity? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java: ########## @@ -99,7 +90,25 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { * Deletes the given metadata partitions. This path reuses DELETE_PARTITION operation. * * @param instantTime - instant time when replacecommit corresponding to the drop will be recorded in the metadata timeline - * @param partitions - list of {@link MetadataPartitionType} to drop + * @param partitions - list of {@link MetadataPartitionType} to drop */ void deletePartitions(String instantTime, List<MetadataPartitionType> partitions); + + /** + * It returns write client for metadata table. + */ + BaseHoodieWriteClient getWriteClient(); + + /** + * Returns true if the metadata table is initialized. + */ + boolean isInitialized(); Review Comment: Is it needed? Can we not get this from table config? If MDT is initialized then we should have some MDT partition as value for `hoodie.table.metadata.partitions` or `hoodie.table.metadata.partitions.inflight` right? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java: ########## @@ -41,35 +44,23 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { * @param engineContext * @param indexPartitionInfos - information about partitions to build such as partition type and base instant time */ - void buildMetadataPartitions(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo> indexPartitionInfos); - - /** - * Initialize file groups for the given metadata partitions when indexing is requested. - * - * @param dataMetaClient - meta client for the data table - * @param metadataPartitions - metadata partitions for which file groups needs to be initialized - * @param instantTime - instant time of the index action - * @throws IOException - */ - void initializeMetadataPartitions(HoodieTableMetaClient dataMetaClient, List<MetadataPartitionType> metadataPartitions, String instantTime) throws IOException; + void buildMetadataPartitions(HoodieEngineContext engineContext, List<HoodieIndexPartitionInfo> indexPartitionInfos) throws IOException; /** * Drop the given metadata partitions. * - * @param metadataPartitions - * @throws IOException + * @param metadataPartitions List of MDT partitions to drop + * @throws IOException on failures */ void dropMetadataPartitions(List<MetadataPartitionType> metadataPartitions) throws IOException; /** * Update the metadata table due to a COMMIT operation. * - * @param commitMetadata commit metadata of the operation of interest. - * @param instantTime instant time of the commit. - * @param isTableServiceAction true if caller is a table service. false otherwise. Only regular write operations can trigger metadata table services and this argument - * will assist in this. + * @param commitMetadata commit metadata of the operation of interest. + * @param instantTime instant time of the commit. */ - void update(HoodieCommitMetadata commitMetadata, String instantTime, boolean isTableServiceAction); + void update(HoodieCommitMetadata commitMetadata, HoodieData<WriteStatus> writeStatuses, String instantTime); Review Comment: Why remove `isTableServiceAction`? Wouldn't we want to distinguish the update call due to regular ingestion writer from table service writer? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -18,14 +18,19 @@ package org.apache.hudi.metadata; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; Review Comment: nit: if we can avoid re-ordering imports, it would make review easier. Also, I think we put hudi imports first. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -241,105 +221,91 @@ public List<MetadataPartitionType> getEnabledPartitionTypes() { return this.enabledPartitionTypes; } - /** - * Initialize the metadata table if it does not exist. - * <p> - * If the metadata table does not exist, then file and partition listing is used to initialize the table. - * - * @param engineContext - * @param actionMetadata Action metadata types extending Avro generated SpecificRecordBase - * @param inflightInstantTimestamp Timestamp of an instant in progress on the dataset. This instant is ignored - * while deciding to initialize the metadata table. - */ - protected abstract <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext, - Option<T> actionMetadata, - Option<String> inflightInstantTimestamp); - - public void initTableMetadata() { - try { - if (this.metadata != null) { - this.metadata.close(); - } - this.metadata = new HoodieBackedTableMetadata(engineContext, dataWriteConfig.getMetadataConfig(), - dataWriteConfig.getBasePath(), dataWriteConfig.getSpillableMapBasePath()); - this.metadataMetaClient = metadata.getMetadataMetaClient(); - } catch (Exception e) { - throw new HoodieException("Error initializing metadata table for reads", e); - } - } - /** * Initialize the metadata table if needed. * * @param dataMetaClient - meta client for the data table * @param actionMetadata - optional action metadata * @param inflightInstantTimestamp - timestamp of an instant in progress on the dataset * @param <T> - action metadata types extending Avro generated SpecificRecordBase - * @throws IOException + * @throws IOException on errors */ - protected <T extends SpecificRecordBase> void initializeIfNeeded(HoodieTableMetaClient dataMetaClient, - Option<T> actionMetadata, - Option<String> inflightInstantTimestamp) throws IOException { + protected <T extends SpecificRecordBase> boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, + Option<T> actionMetadata, + Option<String> inflightInstantTimestamp) throws IOException { HoodieTimer timer = HoodieTimer.start(); + List<MetadataPartitionType> partitionsToInit = new ArrayList<>(MetadataPartitionType.values().length); - boolean exists = metadataTableExists(dataMetaClient, actionMetadata); + try { + boolean exists = metadataTableExists(dataMetaClient, actionMetadata); + if (!exists) { + // FILES partition is always required + partitionsToInit.add(MetadataPartitionType.FILES); + } - if (!exists) { - // Initialize for the first time by listing partitions and files directly from the file system - if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) { - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); + // check if any of the enabled partition types needs to be initialized + // NOTE: It needs to be guarded by async index config because if that is enabled then initialization happens through the index scheduler. + if (!dataWriteConfig.isMetadataAsyncIndex()) { + Set<String> inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig()); + LOG.info("Async metadata indexing disabled and following partitions already initialized: " + inflightAndCompletedPartitions); + this.enabledPartitionTypes.stream() + .filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) + .forEach(partitionsToInit::add); } - return; - } - // if metadata table exists, then check if any of the enabled partition types needs to be initialized - // NOTE: It needs to be guarded by async index config because if that is enabled then initialization happens through the index scheduler. - if (!dataWriteConfig.isMetadataAsyncIndex()) { - Set<String> inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig()); - LOG.info("Async metadata indexing enabled and following partitions already initialized: " + inflightAndCompletedPartitions); - List<MetadataPartitionType> partitionsToInit = this.enabledPartitionTypes.stream() - .filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) - .collect(Collectors.toList()); - // if there are no partitions to initialize or there is a pending operation, then don't initialize in this round - if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { - return; + if (partitionsToInit.isEmpty()) { + // No partitions to initialize + initMetadataReader(); + return true; Review Comment: if there are no partitions to init, should we return false? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -241,105 +221,91 @@ public List<MetadataPartitionType> getEnabledPartitionTypes() { return this.enabledPartitionTypes; } - /** - * Initialize the metadata table if it does not exist. - * <p> - * If the metadata table does not exist, then file and partition listing is used to initialize the table. - * - * @param engineContext - * @param actionMetadata Action metadata types extending Avro generated SpecificRecordBase - * @param inflightInstantTimestamp Timestamp of an instant in progress on the dataset. This instant is ignored - * while deciding to initialize the metadata table. - */ - protected abstract <T extends SpecificRecordBase> void initialize(HoodieEngineContext engineContext, - Option<T> actionMetadata, - Option<String> inflightInstantTimestamp); - - public void initTableMetadata() { - try { - if (this.metadata != null) { - this.metadata.close(); - } - this.metadata = new HoodieBackedTableMetadata(engineContext, dataWriteConfig.getMetadataConfig(), - dataWriteConfig.getBasePath(), dataWriteConfig.getSpillableMapBasePath()); - this.metadataMetaClient = metadata.getMetadataMetaClient(); - } catch (Exception e) { - throw new HoodieException("Error initializing metadata table for reads", e); - } - } - /** * Initialize the metadata table if needed. * * @param dataMetaClient - meta client for the data table * @param actionMetadata - optional action metadata * @param inflightInstantTimestamp - timestamp of an instant in progress on the dataset * @param <T> - action metadata types extending Avro generated SpecificRecordBase - * @throws IOException + * @throws IOException on errors */ - protected <T extends SpecificRecordBase> void initializeIfNeeded(HoodieTableMetaClient dataMetaClient, - Option<T> actionMetadata, - Option<String> inflightInstantTimestamp) throws IOException { + protected <T extends SpecificRecordBase> boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, + Option<T> actionMetadata, + Option<String> inflightInstantTimestamp) throws IOException { HoodieTimer timer = HoodieTimer.start(); + List<MetadataPartitionType> partitionsToInit = new ArrayList<>(MetadataPartitionType.values().length); - boolean exists = metadataTableExists(dataMetaClient, actionMetadata); + try { + boolean exists = metadataTableExists(dataMetaClient, actionMetadata); + if (!exists) { + // FILES partition is always required + partitionsToInit.add(MetadataPartitionType.FILES); + } - if (!exists) { - // Initialize for the first time by listing partitions and files directly from the file system - if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) { - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); + // check if any of the enabled partition types needs to be initialized + // NOTE: It needs to be guarded by async index config because if that is enabled then initialization happens through the index scheduler. + if (!dataWriteConfig.isMetadataAsyncIndex()) { + Set<String> inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig()); + LOG.info("Async metadata indexing disabled and following partitions already initialized: " + inflightAndCompletedPartitions); + this.enabledPartitionTypes.stream() + .filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) + .forEach(partitionsToInit::add); } - return; - } - // if metadata table exists, then check if any of the enabled partition types needs to be initialized - // NOTE: It needs to be guarded by async index config because if that is enabled then initialization happens through the index scheduler. - if (!dataWriteConfig.isMetadataAsyncIndex()) { - Set<String> inflightAndCompletedPartitions = getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig()); - LOG.info("Async metadata indexing enabled and following partitions already initialized: " + inflightAndCompletedPartitions); - List<MetadataPartitionType> partitionsToInit = this.enabledPartitionTypes.stream() - .filter(p -> !inflightAndCompletedPartitions.contains(p.getPartitionPath()) && !MetadataPartitionType.FILES.equals(p)) - .collect(Collectors.toList()); - // if there are no partitions to initialize or there is a pending operation, then don't initialize in this round - if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { - return; + if (partitionsToInit.isEmpty()) { + // No partitions to initialize + initMetadataReader(); + return true; + } + + // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit + // Otherwise, we use the timestamp of the latest completed action. + String initializationTime = dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); + + // Initialize partitions for the first time using data from the files on the file system + if (!initializeFromFilesystem(initializationTime, partitionsToInit, inflightInstantTimestamp)) { + LOG.error("Failed to initialize MDT from filesystem"); + return false; } - String createInstantTime = getInitialCommitInstantTime(dataMetaClient); - initTableMetadata(); // re-init certain flags in BaseTableMetadata - initializeEnabledFileGroups(dataMetaClient, createInstantTime, partitionsToInit); - initialCommit(createInstantTime, partitionsToInit); - updateInitializedPartitionsInTableConfig(partitionsToInit); + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); + return true; + } catch (IOException e) { + LOG.error("Failed to initialize metadata table. Disabling the writer.", e); + return false; } } private <T extends SpecificRecordBase> boolean metadataTableExists(HoodieTableMetaClient dataMetaClient, Option<T> actionMetadata) throws IOException { - boolean exists = dataMetaClient.getFs().exists(new Path(metadataWriteConfig.getBasePath(), - HoodieTableMetaClient.METAFOLDER_NAME)); + boolean exists = dataMetaClient.getTableConfig().isMetadataTableEnabled(); boolean reInitialize = false; // If the un-synced instants have been archived, then // the metadata table will need to be initialized again. if (exists) { - HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()) - .setBasePath(metadataWriteConfig.getBasePath()).build(); - - if (DEFAULT_METADATA_POPULATE_META_FIELDS != metadataMetaClient.getTableConfig().populateMetaFields()) { - LOG.info("Re-initiating metadata table properties since populate meta fields have changed"); - metadataMetaClient = initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS); + try { + metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataWriteConfig.getBasePath()).build(); + if (DEFAULT_METADATA_POPULATE_META_FIELDS != metadataMetaClient.getTableConfig().populateMetaFields()) { + LOG.info("Re-initiating metadata table properties since populate meta fields have changed"); + metadataMetaClient = initializeMetaClient(); + } + } catch (TableNotFoundException e) { + // Table not found, initialize the metadata table. + metadataMetaClient = initializeMetaClient(); Review Comment: It's just initializing the metadata meta client and not the table. Is it intended? Should we throw the exception instead? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean isCommitRevertedByInFlightAction( /** * Initialize the Metadata Table by listing files and partitions from the file system. * - * @param dataMetaClient - {@code HoodieTableMetaClient} for the dataset. + * @param initializationTime - Timestamp to use for the commit + * @param partitionsToInit - List of MDT partitions to initialize * @param inflightInstantTimestamp - Current action instant responsible for this initialization */ - private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, + private boolean initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit, Option<String> inflightInstantTimestamp) throws IOException { if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { return false; } - String createInstantTime = getInitialCommitInstantTime(dataMetaClient); - - initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS); - initTableMetadata(); - // if async metadata indexing is enabled, - // then only initialize files partition as other partitions will be built using HoodieIndexer - List<MetadataPartitionType> enabledPartitionTypes = new ArrayList<>(); - if (dataWriteConfig.isMetadataAsyncIndex()) { - enabledPartitionTypes.add(MetadataPartitionType.FILES); - } else { - // all enabled ones should be initialized - enabledPartitionTypes = this.enabledPartitionTypes; + // FILES partition is always initialized first + ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES) + || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), "FILES partition should be initialized first: " + partitionsToInit); + + metadataMetaClient = initializeMetaClient(); + + // Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT + boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES); + List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? listAllPartitionsFromMDT(initializationTime) : listAllPartitionsFromFilesystem(initializationTime); + Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream() + .map(p -> { + String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); + return Pair.of(partitionName, p.getFileNameToSizeMap()); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + for (MetadataPartitionType partitionType : partitionsToInit) { + // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. + String commitTimeForPartition = generateUniqueCommitInstantTime(initializationTime); + + LOG.info("Initializing MDT partition " + partitionType + " at instant " + commitTimeForPartition); + + Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair; + switch (partitionType) { + case FILES: + fileGroupCountAndRecordsPair = initializeFilesPartition(initializationTime, partitionInfoList); + break; + case BLOOM_FILTERS: + fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(initializationTime, partitionToFilesMap); + break; + case COLUMN_STATS: + fileGroupCountAndRecordsPair = initializeColumnStatsPartition(partitionToFilesMap); + break; + case RECORD_INDEX: + fileGroupCountAndRecordsPair = initializeRecordIndexPartition(); + break; + default: + throw new HoodieMetadataException("Unsupported MDT partition type: " + partitionType); + } + + // Generate the file groups + final int fileGroupCount = fileGroupCountAndRecordsPair.getKey(); + ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionType + " should be > 0"); + initializeFileGroups(dataMetaClient, partitionType, commitTimeForPartition, fileGroupCount); + + // Perform the commit using bulkCommit + HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue(); + bulkCommit(commitTimeForPartition, partitionType, records, fileGroupCount); + metadataMetaClient.reloadActiveTimeline(); + dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, true); + initMetadataReader(); } - initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes); - initialCommit(createInstantTime, enabledPartitionTypes); - updateInitializedPartitionsInTableConfig(enabledPartitionTypes); + return true; } - private String getInitialCommitInstantTime(HoodieTableMetaClient dataMetaClient) { - // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit - // Otherwise, we use the timestamp of the latest completed action. - String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants() - .getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); - LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime); - return createInstantTime; + /** + * Returns a unique timestamp to use for initializing a MDT partition. + * <p> + * Since commits are immutable, we should use unique timestamps to initialize each partition. For this, we will add a suffix to the given initializationTime + * until we find a unique timestamp. + * + * @param initializationTime Timestamp from dataset to use for initialization + * @return a unique timestamp for MDT + */ + private String generateUniqueCommitInstantTime(String initializationTime) { + // Add suffix to initializationTime to find an unused instant time for the next index initialization. + // This function would be called multiple times in a single application if multiple indexes are being + // initialized one after the other. + for (int offset = 0; ; ++offset) { + final String commitInstantTime = HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset); + if (!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) { + return commitInstantTime; + } + } } - private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) { - ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled"); + private Pair<Integer, HoodieData<HoodieRecord>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) { + HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( + engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); + + final int fileGroupCount = dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount(); + return Pair.of(fileGroupCount, records); + } + + private Pair<Integer, HoodieData<HoodieRecord>> initializeBloomFiltersPartition(String createInstantTime, Map<String, Map<String, Long>> partitionToFilesMap) { + HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( + engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); + + final int fileGroupCount = dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount(); + return Pair.of(fileGroupCount, records); + } + + private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition() throws IOException { + // Open the MDT reader to create a file system view + initMetadataReader(); + final HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, + dataMetaClient.getActiveTimeline(), metadata); + + // Collect the list of latest base files present in each partition + List<String> partitions = metadata.getAllPartitionPaths(); + final List<Pair<String, String>> partitionBaseFilePairs = new ArrayList<>(); + for (String partition : partitions) { + partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition) + .map(basefile -> Pair.of(partition, basefile.getFileName())).collect(Collectors.toList())); + } + LOG.info("Initializing record index from " + partitionBaseFilePairs.size() + " base files in " + + partitions.size() + " partitions"); + + // Collect record keys from the files in parallel + HoodieData<HoodieRecord> records = readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs); + records.persist("MEMORY_AND_DISK_SER"); + final long recordCount = records.count(); + + // Initialize the file groups + final int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX, recordCount, + RECORD_INDEX_AVERAGE_RECORD_SIZE, dataWriteConfig.getRecordIndexMinFileGroupCount(), + dataWriteConfig.getRecordIndexMaxFileGroupCount(), dataWriteConfig.getRecordIndexGrowthFactor(), + dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes()); + + LOG.info(String.format("Initializing record index with %d mappings and %d file groups.", recordCount, fileGroupCount)); + return Pair.of(fileGroupCount, records); + } + + /** + * Read the record keys from base files in partitions and return records. + */ + private HoodieData<HoodieRecord> readRecordKeysFromBaseFiles(HoodieEngineContext engineContext, + List<Pair<String, String>> partitionBaseFilePairs) { + if (partitionBaseFilePairs.isEmpty()) { + return engineContext.emptyHoodieData(); + } + + engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index: reading record keys from base files"); + return engineContext.parallelize(partitionBaseFilePairs, partitionBaseFilePairs.size()).flatMap(p -> { + final String partition = p.getKey(); + final String filename = p.getValue(); + Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition + Path.SEPARATOR + filename); + + final String fileId = FSUtils.getFileId(filename); + final String instantTime = FSUtils.getCommitTime(filename); + HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hadoopConf.get(), dataFilePath); + Iterator<String> recordKeyIterator = reader.getRecordKeyIterator(); + + return new Iterator<HoodieRecord>() { + @Override + public boolean hasNext() { + return recordKeyIterator.hasNext(); + } + + @Override + public HoodieRecord next() { + return HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), partition, fileId, + instantTime); + } + }; + }); + } + + private Pair<Integer, HoodieData<HoodieRecord>> initializeFilesPartition(String createInstantTime, List<DirectoryInfo> partitionInfoList) { Review Comment: `createInstantTime` is not being used. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -147,84 +156,55 @@ protected <T extends SpecificRecordBase> HoodieBackedTableMetadataWriter(Configu this.metrics = Option.empty(); this.enabledPartitionTypes = new ArrayList<>(); - if (writeConfig.isMetadataTableEnabled()) { - this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX; - this.metadataWriteConfig = createMetadataWriteConfig(writeConfig, failedWritesCleaningPolicy); - enabled = true; - - // Inline compaction and auto clean is required as we do not expose this table outside - ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(), - "Cleaning is controlled internally for Metadata table."); - ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(), - "Compaction is controlled internally for metadata table."); - // Auto commit is required - ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), - "Auto commit is required for Metadata Table"); - ValidationUtils.checkArgument(this.metadataWriteConfig.getWriteStatusClassName().equals(FailOnFirstErrorWriteStatus.class.getName()), - "MDT should use " + FailOnFirstErrorWriteStatus.class.getName()); - // Metadata Table cannot have metadata listing turned on. (infinite loop, much?) - ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(), - "File listing cannot be used for Metadata Table"); - - this.dataMetaClient = - HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build(); - enablePartitions(); - initRegistry(); - initialize(engineContext, actionMetadata, inflightInstantTimestamp); - initTableMetadata(); - } else { - enabled = false; + this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build(); + + if (dataMetaClient.getTableConfig().isMetadataTableEnabled() || writeConfig.isMetadataTableEnabled()) { Review Comment: How would this work when, let's say, metadata was enabled and then disabled after a few commits? Should we flip the condition and check write config first? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean isCommitRevertedByInFlightAction( /** * Initialize the Metadata Table by listing files and partitions from the file system. * - * @param dataMetaClient - {@code HoodieTableMetaClient} for the dataset. + * @param initializationTime - Timestamp to use for the commit + * @param partitionsToInit - List of MDT partitions to initialize * @param inflightInstantTimestamp - Current action instant responsible for this initialization */ - private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, + private boolean initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit, Option<String> inflightInstantTimestamp) throws IOException { if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { return false; } - String createInstantTime = getInitialCommitInstantTime(dataMetaClient); - - initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS); - initTableMetadata(); - // if async metadata indexing is enabled, - // then only initialize files partition as other partitions will be built using HoodieIndexer - List<MetadataPartitionType> enabledPartitionTypes = new ArrayList<>(); - if (dataWriteConfig.isMetadataAsyncIndex()) { - enabledPartitionTypes.add(MetadataPartitionType.FILES); - } else { - // all enabled ones should be initialized - enabledPartitionTypes = this.enabledPartitionTypes; + // FILES partition is always initialized first + ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES) + || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), "FILES partition should be initialized first: " + partitionsToInit); + + metadataMetaClient = initializeMetaClient(); + + // Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT + boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES); + List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? listAllPartitionsFromMDT(initializationTime) : listAllPartitionsFromFilesystem(initializationTime); + Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream() + .map(p -> { + String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); + return Pair.of(partitionName, p.getFileNameToSizeMap()); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + for (MetadataPartitionType partitionType : partitionsToInit) { + // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. + String commitTimeForPartition = generateUniqueCommitInstantTime(initializationTime); + + LOG.info("Initializing MDT partition " + partitionType + " at instant " + commitTimeForPartition); + + Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair; + switch (partitionType) { + case FILES: + fileGroupCountAndRecordsPair = initializeFilesPartition(initializationTime, partitionInfoList); + break; + case BLOOM_FILTERS: + fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(initializationTime, partitionToFilesMap); + break; + case COLUMN_STATS: + fileGroupCountAndRecordsPair = initializeColumnStatsPartition(partitionToFilesMap); + break; + case RECORD_INDEX: + fileGroupCountAndRecordsPair = initializeRecordIndexPartition(); + break; + default: + throw new HoodieMetadataException("Unsupported MDT partition type: " + partitionType); + } + + // Generate the file groups + final int fileGroupCount = fileGroupCountAndRecordsPair.getKey(); + ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionType + " should be > 0"); + initializeFileGroups(dataMetaClient, partitionType, commitTimeForPartition, fileGroupCount); + + // Perform the commit using bulkCommit + HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue(); + bulkCommit(commitTimeForPartition, partitionType, records, fileGroupCount); + metadataMetaClient.reloadActiveTimeline(); + dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, true); + initMetadataReader(); } - initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes); - initialCommit(createInstantTime, enabledPartitionTypes); - updateInitializedPartitionsInTableConfig(enabledPartitionTypes); + return true; } - private String getInitialCommitInstantTime(HoodieTableMetaClient dataMetaClient) { - // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit - // Otherwise, we use the timestamp of the latest completed action. - String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants() - .getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); - LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime); - return createInstantTime; + /** + * Returns a unique timestamp to use for initializing a MDT partition. + * <p> + * Since commits are immutable, we should use unique timestamps to initialize each partition. For this, we will add a suffix to the given initializationTime + * until we find a unique timestamp. + * + * @param initializationTime Timestamp from dataset to use for initialization + * @return a unique timestamp for MDT + */ + private String generateUniqueCommitInstantTime(String initializationTime) { + // Add suffix to initializationTime to find an unused instant time for the next index initialization. + // This function would be called multiple times in a single application if multiple indexes are being + // initialized one after the other. + for (int offset = 0; ; ++offset) { + final String commitInstantTime = HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset); + if (!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) { + return commitInstantTime; + } + } } - private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) { - ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled"); + private Pair<Integer, HoodieData<HoodieRecord>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) { + HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( + engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); + + final int fileGroupCount = dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount(); + return Pair.of(fileGroupCount, records); + } + + private Pair<Integer, HoodieData<HoodieRecord>> initializeBloomFiltersPartition(String createInstantTime, Map<String, Map<String, Long>> partitionToFilesMap) { + HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( + engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); + + final int fileGroupCount = dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount(); + return Pair.of(fileGroupCount, records); + } + + private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition() throws IOException { + // Open the MDT reader to create a file system view + initMetadataReader(); Review Comment: maybe we can check first if this.metadata != null then reuse it else init? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -430,53 +395,212 @@ private <T extends SpecificRecordBase> boolean isCommitRevertedByInFlightAction( /** * Initialize the Metadata Table by listing files and partitions from the file system. * - * @param dataMetaClient - {@code HoodieTableMetaClient} for the dataset. + * @param initializationTime - Timestamp to use for the commit + * @param partitionsToInit - List of MDT partitions to initialize * @param inflightInstantTimestamp - Current action instant responsible for this initialization */ - private boolean initializeFromFilesystem(HoodieTableMetaClient dataMetaClient, + private boolean initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit, Option<String> inflightInstantTimestamp) throws IOException { if (anyPendingDataInstant(dataMetaClient, inflightInstantTimestamp)) { return false; } - String createInstantTime = getInitialCommitInstantTime(dataMetaClient); - - initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS); - initTableMetadata(); - // if async metadata indexing is enabled, - // then only initialize files partition as other partitions will be built using HoodieIndexer - List<MetadataPartitionType> enabledPartitionTypes = new ArrayList<>(); - if (dataWriteConfig.isMetadataAsyncIndex()) { - enabledPartitionTypes.add(MetadataPartitionType.FILES); - } else { - // all enabled ones should be initialized - enabledPartitionTypes = this.enabledPartitionTypes; + // FILES partition is always initialized first + ValidationUtils.checkArgument(!partitionsToInit.contains(MetadataPartitionType.FILES) + || partitionsToInit.get(0).equals(MetadataPartitionType.FILES), "FILES partition should be initialized first: " + partitionsToInit); + + metadataMetaClient = initializeMetaClient(); + + // Get a complete list of files and partitions from the file system or from already initialized FILES partition of MDT + boolean filesPartitionAvailable = dataMetaClient.getTableConfig().isMetadataPartitionEnabled(MetadataPartitionType.FILES); + List<DirectoryInfo> partitionInfoList = filesPartitionAvailable ? listAllPartitionsFromMDT(initializationTime) : listAllPartitionsFromFilesystem(initializationTime); + Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream() + .map(p -> { + String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); + return Pair.of(partitionName, p.getFileNameToSizeMap()); + }) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + + for (MetadataPartitionType partitionType : partitionsToInit) { + // Find the commit timestamp to use for this partition. Each initialization should use its own unique commit time. + String commitTimeForPartition = generateUniqueCommitInstantTime(initializationTime); + + LOG.info("Initializing MDT partition " + partitionType + " at instant " + commitTimeForPartition); + + Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair; + switch (partitionType) { + case FILES: + fileGroupCountAndRecordsPair = initializeFilesPartition(initializationTime, partitionInfoList); + break; + case BLOOM_FILTERS: + fileGroupCountAndRecordsPair = initializeBloomFiltersPartition(initializationTime, partitionToFilesMap); + break; + case COLUMN_STATS: + fileGroupCountAndRecordsPair = initializeColumnStatsPartition(partitionToFilesMap); + break; + case RECORD_INDEX: + fileGroupCountAndRecordsPair = initializeRecordIndexPartition(); + break; + default: + throw new HoodieMetadataException("Unsupported MDT partition type: " + partitionType); + } + + // Generate the file groups + final int fileGroupCount = fileGroupCountAndRecordsPair.getKey(); + ValidationUtils.checkArgument(fileGroupCount > 0, "FileGroup count for MDT partition " + partitionType + " should be > 0"); + initializeFileGroups(dataMetaClient, partitionType, commitTimeForPartition, fileGroupCount); + + // Perform the commit using bulkCommit + HoodieData<HoodieRecord> records = fileGroupCountAndRecordsPair.getValue(); + bulkCommit(commitTimeForPartition, partitionType, records, fileGroupCount); + metadataMetaClient.reloadActiveTimeline(); + dataMetaClient.getTableConfig().setMetadataPartitionState(dataMetaClient, partitionType, true); + initMetadataReader(); } - initializeEnabledFileGroups(dataMetaClient, createInstantTime, enabledPartitionTypes); - initialCommit(createInstantTime, enabledPartitionTypes); - updateInitializedPartitionsInTableConfig(enabledPartitionTypes); + return true; } - private String getInitialCommitInstantTime(HoodieTableMetaClient dataMetaClient) { - // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit - // Otherwise, we use the timestamp of the latest completed action. - String createInstantTime = dataMetaClient.getActiveTimeline().filterCompletedInstants() - .getReverseOrderedInstants().findFirst().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); - LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime); - return createInstantTime; + /** + * Returns a unique timestamp to use for initializing a MDT partition. + * <p> + * Since commits are immutable, we should use unique timestamps to initialize each partition. For this, we will add a suffix to the given initializationTime + * until we find a unique timestamp. + * + * @param initializationTime Timestamp from dataset to use for initialization + * @return a unique timestamp for MDT + */ + private String generateUniqueCommitInstantTime(String initializationTime) { + // Add suffix to initializationTime to find an unused instant time for the next index initialization. + // This function would be called multiple times in a single application if multiple indexes are being + // initialized one after the other. + for (int offset = 0; ; ++offset) { + final String commitInstantTime = HoodieTableMetadataUtil.createIndexInitTimestamp(initializationTime, offset); + if (!metadataMetaClient.getCommitsTimeline().containsInstant(commitInstantTime)) { + return commitInstantTime; + } + } } - private boolean anyPendingDataInstant(HoodieTableMetaClient dataMetaClient, Option<String> inflightInstantTimestamp) { - ValidationUtils.checkState(enabled, "Metadata table cannot be initialized as it is not enabled"); + private Pair<Integer, HoodieData<HoodieRecord>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) { + HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( + engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); + + final int fileGroupCount = dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount(); + return Pair.of(fileGroupCount, records); + } + + private Pair<Integer, HoodieData<HoodieRecord>> initializeBloomFiltersPartition(String createInstantTime, Map<String, Map<String, Long>> partitionToFilesMap) { + HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( + engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); + + final int fileGroupCount = dataWriteConfig.getMetadataConfig().getBloomFilterIndexFileGroupCount(); + return Pair.of(fileGroupCount, records); + } + + private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition() throws IOException { + // Open the MDT reader to create a file system view + initMetadataReader(); + final HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, + dataMetaClient.getActiveTimeline(), metadata); + + // Collect the list of latest base files present in each partition + List<String> partitions = metadata.getAllPartitionPaths(); + final List<Pair<String, String>> partitionBaseFilePairs = new ArrayList<>(); + for (String partition : partitions) { + partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition) Review Comment: There are certain scenarios, like when using bucket index or hbase index, inserts can go directly to log files so there is a latest file slice w/o base file. Should we do fsView.getLatestFileSlices instead? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -726,25 +832,18 @@ private interface ConvertMetadataFunction { /** * Processes commit metadata from data table and commits to metadata table. * - * @param instantTime instant time of interest. + * @param instantTime instant time of interest. * @param convertMetadataFunction converter function to convert the respective metadata to List of HoodieRecords to be written to metadata table. - * @param <T> type of commit metadata. - * @param canTriggerTableService true if table services can be triggered. false otherwise. */ - private <T> void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction, boolean canTriggerTableService) { - if (!dataWriteConfig.isMetadataTableEnabled()) { - return; - } + private void processAndCommit(String instantTime, ConvertMetadataFunction convertMetadataFunction) { Review Comment: Want to understand intention behind removing `canTriggerTableService`? Is it that we will pre-emptively check and trigger table service if needed before committing to MDT? ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -933,52 +1041,80 @@ protected HoodieData<HoodieRecord> prepRecords(Map<MetadataPartitionType, } /** - * Perform a compaction on the Metadata Table. - * - * Cases to be handled: - * 1. We cannot perform compaction if there are previous inflight operations on the dataset. This is because - * a compacted metadata base file at time Tx should represent all the actions on the dataset till time Tx. - * - * 2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a - * deltacommit. + * Optimize the metadata table by running compaction, clean and archive as required. + * <p> + * Don't perform optimization if there are inflight operations on the dataset. This is for two reasons: Review Comment: But isn't t quite likely that there will be some inflight operation on the dataset most of the time. Then we keep deferring the optimization every time this is the case? ########## hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java: ########## @@ -86,15 +89,15 @@ public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationTy public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { super.moveInflightCommitToComplete(instantTime, metadata); if (writer != null) { - writer.update(metadata, instantTime, false); + writer.update(metadata, HoodieListData.eager(Collections.EMPTY_LIST), instantTime); Review Comment: `HoodieListData` won't be convertable to `HoodieJavaRDD` which is what spark metadata writer will use. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -1000,87 +1136,78 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan // Trigger cleaning with suffixes based on the same instant time. This ensures that any future // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. - writeClient.clean(instantTime + "002"); + writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime)); writeClient.lazyRollbackFailedIndexing(); } /** - * This is invoked to initialize metadata table for a dataset. - * Initial commit has special handling mechanism due to its scale compared to other regular commits. - * During cold startup, the list of files to be committed can be huge. - * So creating a HoodieCommitMetadata out of these large number of files, - * and calling the existing update(HoodieCommitMetadata) function does not scale well. - * Hence, we have a special commit just for the initialization scenario. + * Validates the timeline for both main and metadata tables to ensure compaction on MDT can be scheduled. */ - private void initialCommit(String createInstantTime, List<MetadataPartitionType> partitionTypes) { - // List all partitions in the basePath of the containing dataset - LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); - engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions: " + dataWriteConfig.getTableName()); - - Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>(); - - // skip file system listing to populate metadata records if it's a fresh table. - // this is applicable only if the table already has N commits and metadata is enabled at a later point in time. - if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table. - // If not, last completed commit in data table will be chosen as the initial commit time. - LOG.info("Triggering empty Commit to metadata to initialize"); - } else { - List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient); - Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream() - .map(p -> { - String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); - return Pair.of(partitionName, p.getFileNameToSizeMap()); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum(); - List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet()); - - if (partitionTypes.contains(MetadataPartitionType.FILES)) { - // Record which saves the list of all partitions - HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); - HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord); - ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1)); - partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); - } - - if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) { - final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( - engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); - partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD); - } + private boolean validateTimelineBeforeSchedulingCompaction(Option<String> inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) { + // we need to find if there are any inflights in data table timeline before or equal to the latest delta commit in metadata table. + // Whenever you want to change this logic, please ensure all below scenarios are considered. + // a. There could be a chance that latest delta commit in MDT is committed in MDT, but failed in DT. And so findInstantsBeforeOrEquals() should be employed + // b. There could be DT inflights after latest delta commit in MDT and we are ok with it. bcoz, the contract is, latest compaction instant time in MDT represents + // any instants before that is already synced with metadata table. + // c. Do consider out of order commits. For eg, c4 from DT could complete before c3. and we can't trigger compaction in MDT with c4 as base instant time, until every + // instant before c4 is synced with metadata table. + List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested() + .findInstantsBeforeOrEquals(latestDeltaCommitTimeInMetadataTable).getInstants(); - if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) { - final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( - engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); - partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD); - } - LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata"); + if (!pendingInstants.isEmpty()) { + checkNumDeltaCommits(metadataMetaClient, dataWriteConfig.getMetadataConfig().getMaxNumDeltacommitsWhenPending()); + LOG.info(String.format( + "Cannot compact metadata table as there are %d inflight instants in data table before latest deltacommit in metadata table: %s. Inflight instants in data table: %s", + pendingInstants.size(), latestDeltaCommitTimeInMetadataTable, Arrays.toString(pendingInstants.toArray()))); + return false; } - commit(createInstantTime, partitionToRecordsMap, false); + return true; } - private HoodieData<HoodieRecord> getFilesPartitionRecords(String createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord allPartitionRecord) { - HoodieData<HoodieRecord> filesPartitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1); - if (partitionInfoList.isEmpty()) { - return filesPartitionRecords; - } + /** + * Return records that represent update to the record index due to write operation on the dataset. + * + * @param writeStatuses (@code WriteStatus} from the write operation + */ + private HoodieData<HoodieRecord> getRecordIndexUpdates(HoodieData<WriteStatus> writeStatuses) { + return writeStatuses.flatMap(writeStatus -> { + List<HoodieRecord> recordList = new LinkedList<>(); + for (HoodieRecord writtenRecord : writeStatus.getWrittenRecords()) { + if (!writeStatus.isErrored(writtenRecord.getKey())) { + HoodieRecord hoodieRecord; + HoodieKey key = writtenRecord.getKey(); + Option<HoodieRecordLocation> newLocation = writtenRecord.getNewLocation(); + if (newLocation.isPresent()) { + if (writtenRecord.getCurrentLocation() != null) { + // This is an update, no need to update index if the location has not changed + // newLocation should have the same fileID as currentLocation. The instantTimes differ as newLocation's + // instantTime refers to the current commit which was completed. + if (!writtenRecord.getCurrentLocation().getFileId().equals(newLocation.get().getFileId())) { + final String msg = String.format("Detected update in location of record with key %s from %s " + + " to %s. The fileID should not change.", + writtenRecord.getKey(), writtenRecord.getCurrentLocation(), newLocation.get()); + LOG.error(msg); + throw new HoodieMetadataException(msg); + } else { + // TODO: This may be required for clustering usecases where record location changes Review Comment: i guess this may not be required as long as the write client/table service client is sending the write statuses properly. In case of clustering, `currentLocation` will be null and `newLocation` will have some value (given that it goes through create handle), so it's fine to just call `createRecordIndexUpdate`. ########## hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java: ########## @@ -77,7 +79,8 @@ public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationTy HoodieCommitMetadata commitMetadata = super.doWriteOperation(commitTime, operationType, newPartitionsToAdd, partitionToFilesNameLengthMap, bootstrap, createInflightCommit); if (writer != null && !createInflightCommit) { - writer.update(commitMetadata, commitTime, false); + writer.performTableServices(Option.of(commitTime)); Review Comment: I see so here is the point where table service optimization is being done. Still, the point about contention between MDT table service and inflight instant in data table. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -883,44 +977,58 @@ public void close() throws Exception { /** * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit. * - * @param instantTime - Action instant time for this commit - * @param partitionRecordsMap - Map of partition name to its records to commit - * @param canTriggerTableService true if table services can be scheduled and executed. false otherwise. + * @param instantTime - Action instant time for this commit + * @param partitionRecordsMap - Map of partition type to its records to commit */ - protected abstract void commit( - String instantTime, Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap, - boolean canTriggerTableService); + protected abstract void commit(String instantTime, Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionRecordsMap); + + /** + * Commit the {@code HoodieRecord}s to Metadata Table as a new delta-commit using bulk commit (if supported). + * <p> + * This is used to optimize the initial commit to the MDT partition which may contains a large number of + * records and hence is more suited to bulkInsert for write performance. + * + * @param instantTime - Action instant time for this commit + * @param partitionType - The MDT partition to which records are to be committed + * @param records - records to be bulk inserted + * @param fileGroupCount - The maximum number of file groups to which the records will be written. + */ + protected void bulkCommit( Review Comment: +1 for this change. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -1000,87 +1136,78 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan // Trigger cleaning with suffixes based on the same instant time. This ensures that any future // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. - writeClient.clean(instantTime + "002"); + writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime)); writeClient.lazyRollbackFailedIndexing(); } /** - * This is invoked to initialize metadata table for a dataset. - * Initial commit has special handling mechanism due to its scale compared to other regular commits. - * During cold startup, the list of files to be committed can be huge. - * So creating a HoodieCommitMetadata out of these large number of files, - * and calling the existing update(HoodieCommitMetadata) function does not scale well. - * Hence, we have a special commit just for the initialization scenario. + * Validates the timeline for both main and metadata tables to ensure compaction on MDT can be scheduled. */ - private void initialCommit(String createInstantTime, List<MetadataPartitionType> partitionTypes) { - // List all partitions in the basePath of the containing dataset - LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); - engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions: " + dataWriteConfig.getTableName()); - - Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>(); - - // skip file system listing to populate metadata records if it's a fresh table. - // this is applicable only if the table already has N commits and metadata is enabled at a later point in time. - if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table. - // If not, last completed commit in data table will be chosen as the initial commit time. - LOG.info("Triggering empty Commit to metadata to initialize"); - } else { - List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient); - Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream() - .map(p -> { - String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); - return Pair.of(partitionName, p.getFileNameToSizeMap()); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum(); - List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet()); - - if (partitionTypes.contains(MetadataPartitionType.FILES)) { - // Record which saves the list of all partitions - HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); - HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord); - ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1)); - partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); - } - - if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) { - final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( - engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); - partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD); - } + private boolean validateTimelineBeforeSchedulingCompaction(Option<String> inFlightInstantTimestamp, String latestDeltaCommitTimeInMetadataTable) { Review Comment: let's remove `inFlightInstantTimestamp` if not needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org