nsivabalan commented on a change in pull request #3590: URL: https://github.com/apache/hudi/pull/3590#discussion_r716249167
########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java ########## @@ -99,83 +94,94 @@ protected void initialize(HoodieEngineContext engineContext, HoodieTableMetaClie @Override protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) { ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); - JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName); + JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1); try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) { - writeClient.startCommitWithTime(instantTime); + if (!metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) { + // if this is a new commit being applied to metadata for the first time + writeClient.startCommitWithTime(instantTime); + } else { + // this code path refers to a re-attempted commit that got committed to metadata, but failed in dataset. + // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. + // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes + // are upserts to metadata table and so only a new delta commit will be created. + // once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is + // already part of completed commit. So, we have to manually remove the completed instant and proceed. + // and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table. + HoodieInstant alreadyCompletedInstant = metaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant().get(); + FSUtils.deleteInstantFile(metaClient.getFs(), metaClient.getMetaPath(), alreadyCompletedInstant); + metaClient.reloadActiveTimeline(); + } List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime).collect(); statuses.forEach(writeStatus -> { if (writeStatus.hasErrors()) { throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime); } }); - // trigger cleaning, compaction, with suffixes based on the same instant time. This ensures that any future - // delta commits synced over will not have an instant time lesser than the last completed instant on the - // metadata table. - if (writeClient.scheduleCompactionAtInstant(instantTime + "001", Option.empty())) { - writeClient.compact(instantTime + "001"); - } - writeClient.clean(instantTime + "002"); + + // reload timeline + metaClient.reloadActiveTimeline(); + + compactIfNecessary(writeClient, instantTime); + doClean(writeClient, instantTime); } // Update total size of the metadata and count of base/log files - metrics.ifPresent(m -> { - try { - Map<String, String> stats = m.getStats(false, metaClient, metadata); - m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)), - Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)), - Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)), - Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES))); - } catch (HoodieIOException e) { - LOG.error("Could not publish metadata size metrics", e); - } - }); + metrics.ifPresent(m -> m.updateSizeMetrics(metaClient, metadata)); } /** - * Tag each record with the location. + * Perform a compaction on the Metadata Table. + * + * Cases to be handled: + * 1. We cannot perform compaction if there are previous inflight operations on the dataset. This is because + * a compacted metadata base file at time Tx should represent all the actions on the dataset till time Tx. * - * Since we only read the latest base file in a partition, we tag the records with the instant time of the latest - * base file. + * 2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a + * deltacommit. */ - private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName) { - HoodieTable table = HoodieSparkTable.create(metadataWriteConfig, engineContext); - TableFileSystemView.SliceView fsView = table.getSliceView(); - List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName) - .map(FileSlice::getBaseFile) - .filter(Option::isPresent) - .map(Option::get) - .collect(Collectors.toList()); - - // All the metadata fits within a single base file - if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) { - if (baseFiles.size() > 1) { - throw new HoodieMetadataException("Multiple base files found in metadata partition"); - } + private void compactIfNecessary(SparkRDDWriteClient writeClient, String instantTime) { + String latestDeltacommitTime = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant() + .get().getTimestamp(); + List<HoodieInstant> pendingInstants = datasetMetaClient.reloadActiveTimeline().filterInflightsAndRequested() + .findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList()); + + if (!pendingInstants.isEmpty()) { + LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s", + pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray()))); + return; } - JavaSparkContext jsc = ((HoodieSparkEngineContext) engineContext).getJavaSparkContext(); - String fileId; - String instantTime; - if (!baseFiles.isEmpty()) { - fileId = baseFiles.get(0).getFileId(); - instantTime = baseFiles.get(0).getCommitTime(); - } else { - // If there is a log file then we can assume that it has the data - List<HoodieLogFile> logFiles = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()) - .map(FileSlice::getLatestLogFile) - .filter(Option::isPresent) - .map(Option::get) - .collect(Collectors.toList()); - if (logFiles.isEmpty()) { - // No base and log files. All are new inserts - return jsc.parallelize(records, 1); - } - - fileId = logFiles.get(0).getFileId(); - instantTime = logFiles.get(0).getBaseCommitTime(); + // Trigger compaction with suffixes based on the same instant time. This ensures that any future + // delta commits synced over will not have an instant time lesser than the last completed instant on the + // metadata table. + final String compactionInstantTime = latestDeltacommitTime + "001"; + if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) { Review comment: we will not have or use data's write client in this layer at all. So, prefer to leave it as is. But I have fixed all other namings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org