prashantwason commented on code in PR #8684: URL: https://github.com/apache/hudi/pull/8684#discussion_r1194267719
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ########## @@ -1097,87 +1165,76 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan // Trigger cleaning with suffixes based on the same instant time. This ensures that any future // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. - writeClient.clean(instantTime + "002"); + writeClient.clean(HoodieTableMetadataUtil.createCleanTimestamp(instantTime)); writeClient.lazyRollbackFailedIndexing(); } /** - * This is invoked to initialize metadata table for a dataset. - * Initial commit has special handling mechanism due to its scale compared to other regular commits. - * During cold startup, the list of files to be committed can be huge. - * So creating a HoodieCommitMetadata out of these large number of files, - * and calling the existing update(HoodieCommitMetadata) function does not scale well. - * Hence, we have a special commit just for the initialization scenario. + * Validates the timeline for both main and metadata tables. */ - private void initialCommit(String createInstantTime, List<MetadataPartitionType> partitionTypes) { - // List all partitions in the basePath of the containing dataset - LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath()); - engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions: " + dataWriteConfig.getTableName()); - - Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>(); - - // skip file system listing to populate metadata records if it's a fresh table. - // this is applicable only if the table already has N commits and metadata is enabled at a later point in time. - if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table. - // If not, last completed commit in data table will be chosen as the initial commit time. - LOG.info("Triggering empty Commit to metadata to initialize"); - } else { - List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient); - Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream() - .map(p -> { - String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath()); - return Pair.of(partitionName, p.getFileNameToSizeMap()); - }) - .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - - int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum(); - List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet()); - - if (partitionTypes.contains(MetadataPartitionType.FILES)) { - // Record which saves the list of all partitions - HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions); - HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord); - ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1)); - partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords); - } - - if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) { - final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords( - engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime); - partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD); - } - - if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) { - final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords( - engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams()); - partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD); - } - LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata"); + private boolean validateTimelineBeforeSchedulingCompaction(Option<String> inFlightInstantTimestamp, String latestDeltacommitTime) { + // There should not be any incomplete instants on MDT + HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); + List<HoodieInstant> pendingInstantsOnMetadataTable = metadataTimeline.filterInflightsAndRequested().getInstants(); + if (!pendingInstantsOnMetadataTable.isEmpty()) { + LOG.info(String.format( + "Cannot compact MDT as there are %d inflight instants: %s", + pendingInstantsOnMetadataTable.size(), Arrays.toString(pendingInstantsOnMetadataTable.toArray()))); + metrics.ifPresent(m -> m.incrementMetric(HoodieMetadataMetrics.SKIP_TABLE_SERVICES, 1)); + return false; } - commit(createInstantTime, partitionToRecordsMap, false); - } + // There should not be any incomplete instants on dataset + HoodieActiveTimeline datasetTimeline = dataMetaClient.reloadActiveTimeline(); + List<HoodieInstant> pendingInstantsOnDataset = datasetTimeline.filterInflightsAndRequested().getInstantsAsStream() + .filter(i -> !inFlightInstantTimestamp.isPresent() || !i.getTimestamp().equals(inFlightInstantTimestamp.get())) + .collect(Collectors.toList()); + if (!pendingInstantsOnDataset.isEmpty()) { + LOG.info(String.format( + "Cannot compact MDT as there are %d inflight instants on dataset before latest deltacommit %s: %s", + pendingInstantsOnDataset.size(), latestDeltacommitTime, Arrays.toString(pendingInstantsOnDataset.toArray()))); + metrics.ifPresent(m -> m.incrementMetric(HoodieMetadataMetrics.SKIP_TABLE_SERVICES, 1)); + return false; + } - private HoodieData<HoodieRecord> getFilesPartitionRecords(String createInstantTime, List<DirectoryInfo> partitionInfoList, HoodieRecord allPartitionRecord) { - HoodieData<HoodieRecord> filesPartitionRecords = engineContext.parallelize(Arrays.asList(allPartitionRecord), 1); - if (partitionInfoList.isEmpty()) { - return filesPartitionRecords; + // Check if the inflight commit is greater than all the completed commits. Review Comment: I have simplified it and removed the extra checks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org