This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 65866c4 [HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table be compatible (#2422) 65866c4 is described below commit 65866c45ec04820b01ab701e7de5cf6a406d2a8e Author: vinoth chandar <vinothchan...@users.noreply.github.com> AuthorDate: Sat Jan 9 16:53:34 2021 -0800 [HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table be compatible (#2422) * [HUDI-1276] [HUDI-1459] Make Clustering/ReplaceCommit and Metadata table be compatible * Use filesystemview and json format from metadata. Add tests Co-authored-by: Satish Kotha <satishko...@uber.com> --- .../hudi/table/HoodieTimelineArchiveLog.java | 4 +- .../action/clean/BaseCleanActionExecutor.java | 4 +- .../hudi/table/action/clean/CleanPlanner.java | 77 ++++++++++---- .../hudi/table/action/rollback/RollbackUtils.java | 1 + .../hudi/metadata/TestHoodieBackedMetadata.java | 11 ++ .../java/org/apache/hudi/table/TestCleaner.java | 112 ++++++++++++++++++++- .../table/timeline/TimelineMetadataUtils.java | 7 +- .../table/view/AbstractTableFileSystemView.java | 20 ++++ .../table/view/PriorityBasedFileSystemView.java | 10 ++ .../view/RemoteHoodieTableFileSystemView.java | 30 ++++++ .../common/table/view/TableFileSystemView.java | 12 ++- .../apache/hudi/common/util/ClusteringUtils.java | 2 +- .../hudi/metadata/HoodieTableMetadataUtil.java | 7 ++ .../table/view/TestHoodieTableFileSystemView.java | 7 ++ .../hudi/common/testutils/HoodieTestTable.java | 5 + .../timeline/service/FileSystemViewHandler.java | 15 +++ .../service/handlers/FileSliceHandler.java | 10 ++ 17 files changed, 301 insertions(+), 33 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index 50967b1..3f4c271 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -290,10 +290,10 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> { LOG.info("Wrapper schema " + wrapperSchema.toString()); List<IndexedRecord> records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { + // TODO HUDI-1518 Cleaner now takes care of removing replaced file groups. This call to deleteReplacedFileGroups can be removed. boolean deleteSuccess = deleteReplacedFileGroups(context, hoodieInstant); if (!deleteSuccess) { - // throw error and stop archival if deleting replaced file groups failed. - throw new HoodieCommitException("Unable to delete file(s) for " + hoodieInstant.getFileName()); + LOG.warn("Unable to delete file(s) for " + hoodieInstant.getFileName() + ", replaced files possibly deleted by cleaner"); } try { deleteAnyLeftOverMarkerFiles(context, hoodieInstant); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java index 18e638e..786bf3e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java @@ -21,9 +21,9 @@ package org.apache.hudi.table.action.clean; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieActionInstant; +import org.apache.hudi.avro.model.HoodieCleanFileInfo; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; -import org.apache.hudi.avro.model.HoodieCleanFileInfo; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCleaningPolicy; @@ -72,7 +72,7 @@ public abstract class BaseCleanActionExecutor<T extends HoodieRecordPayload, I, List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant); if (partitionsToClean.isEmpty()) { - LOG.info("Nothing to clean here. It is already clean"); + LOG.info("Nothing to clean here."); return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build(); } LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 31d433d..321f248 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -111,14 +112,14 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser /** * Returns list of partitions where clean operations needs to be performed. * - * @param newInstantToRetain New instant to be retained after this cleanup operation + * @param earliestRetainedInstant New instant to be retained after this cleanup operation * @return list of partitions to scan for cleaning * @throws IOException when underlying file-system throws this exception */ - public List<String> getPartitionPathsToClean(Option<HoodieInstant> newInstantToRetain) throws IOException { + public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException { switch (config.getCleanerPolicy()) { case KEEP_LATEST_COMMITS: - return getPartitionPathsForCleanByCommits(newInstantToRetain); + return getPartitionPathsForCleanByCommits(earliestRetainedInstant); case KEEP_LATEST_FILE_VERSIONS: return getPartitionPathsForFullCleaning(); default: @@ -168,10 +169,16 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> { try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), - HoodieCommitMetadata.class); - return commitMetadata.getPartitionToWriteStats().keySet().stream(); + if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) { + HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes( + hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class); + return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream()); + } else { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), + HoodieCommitMetadata.class); + return commitMetadata.getPartitionToWriteStats().keySet().stream(); + } } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } @@ -196,13 +203,17 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser private List<CleanFileInfo> getFilesToCleanKeepingLatestVersions(String partitionPath) { LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained() + " file versions. "); - List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); List<CleanFileInfo> deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints List<String> savepointedFiles = hoodieTable.getSavepoints().stream() .flatMap(this::getSavepointedDataFiles) .collect(Collectors.toList()); + // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely + // In other words, the file versions only apply to the active file groups. + deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty())); + + List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); for (HoodieFileGroup fileGroup : fileGroups) { int keepVersions = config.getCleanerFileVersionsRetained(); // do not cleanup slice required for pending compaction @@ -226,18 +237,7 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser // Delete the remaining files while (fileSliceIterator.hasNext()) { FileSlice nextSlice = fileSliceIterator.next(); - if (nextSlice.getBaseFile().isPresent()) { - HoodieBaseFile dataFile = nextSlice.getBaseFile().get(); - deletePaths.add(new CleanFileInfo(dataFile.getPath(), false)); - if (dataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { - deletePaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true)); - } - } - if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { - // If merge on read, then clean the log files for the commits as well - deletePaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) - .collect(Collectors.toList())); - } + deletePaths.addAll(getCleanFileInfoForSlice(nextSlice)); } } return deletePaths; @@ -269,7 +269,11 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser // determine if we have enough commits, to start cleaning. if (commitTimeline.countInstants() > commitsRetained) { - HoodieInstant earliestCommitToRetain = getEarliestCommitToRetain().get(); + Option<HoodieInstant> earliestCommitToRetainOption = getEarliestCommitToRetain(); + HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get(); + // all replaced file groups before earliestCommitToRetain are eligible to clean + deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption)); + // add active files List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); for (HoodieFileGroup fileGroup : fileGroups) { List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList()); @@ -322,6 +326,20 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser } return deletePaths; } + + private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> savepointedFiles, String partitionPath, Option<HoodieInstant> earliestCommitToRetain) { + final Stream<HoodieFileGroup> replacedGroups; + if (earliestCommitToRetain.isPresent()) { + replacedGroups = fileSystemView.getReplacedFileGroupsBefore(earliestCommitToRetain.get().getTimestamp(), partitionPath); + } else { + replacedGroups = fileSystemView.getAllReplacedFileGroups(partitionPath); + } + return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices) + // do not delete savepointed files (archival will make sure corresponding replacecommit file is not deleted) + .filter(slice -> !slice.getBaseFile().isPresent() || !savepointedFiles.contains(slice.getBaseFile().get().getFileName())) + .flatMap(slice -> getCleanFileInfoForSlice(slice).stream()) + .collect(Collectors.toList()); + } /** * Gets the latest version < instantTime. This version file could still be used by queries. @@ -339,6 +357,23 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser return null; } + private List<CleanFileInfo> getCleanFileInfoForSlice(FileSlice nextSlice) { + List<CleanFileInfo> cleanPaths = new ArrayList<>(); + if (nextSlice.getBaseFile().isPresent()) { + HoodieBaseFile dataFile = nextSlice.getBaseFile().get(); + cleanPaths.add(new CleanFileInfo(dataFile.getPath(), false)); + if (dataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { + cleanPaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true)); + } + } + if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + // If merge on read, then clean the log files for the commits as well + cleanPaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); + } + return cleanPaths; + } + /** * Returns files to be cleaned for the given partitionPath based on cleaning policy. */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index 18f284e..ee7f4dd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -122,6 +122,7 @@ public class RollbackUtils { List<ListingBasedRollbackRequest> partitionRollbackRequests = new ArrayList<>(); switch (instantToRollback.getAction()) { case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.REPLACE_COMMIT_ACTION: LOG.info("Rolling back commit action."); partitionRollbackRequests.add( ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index 32cec71..5932236 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -18,6 +18,7 @@ package org.apache.hudi.metadata; +import org.apache.hudi.client.HoodieWriteResult; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -498,6 +499,15 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); assertNoWriteErrors(writeStatuses); assertFalse(metadata(client).isInSync()); + + // insert overwrite to test replacecommit + newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + client.startCommitWithTime(newCommitTime, HoodieTimeline.REPLACE_COMMIT_ACTION); + records = dataGen.generateInserts(newCommitTime, 5); + HoodieWriteResult replaceResult = client.insertOverwrite(jsc.parallelize(records, 1), newCommitTime); + writeStatuses = replaceResult.getWriteStatuses().collect(); + assertNoWriteErrors(writeStatuses); + assertFalse(metadata(client).isInSync()); } // Enable metadata table and ensure it is synced @@ -800,6 +810,7 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { // FileSystemView should expose the same data List<HoodieFileGroup> fileGroups = tableView.getAllFileGroups(partition).collect(Collectors.toList()); + fileGroups.addAll(tableView.getAllReplacedFileGroups(partition).collect(Collectors.toList())); fileGroups.forEach(g -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(g)); fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> LogManager.getLogger(TestHoodieBackedMetadata.class).info(b))); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 69c6f98..3a5d737 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -18,6 +18,8 @@ package org.apache.hudi.table; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata; @@ -38,9 +40,11 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -57,6 +61,7 @@ import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.CompactionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -65,9 +70,6 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndex; import org.apache.hudi.table.action.clean.CleanPlanner; import org.apache.hudi.testutils.HoodieClientTestBase; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -76,6 +78,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import scala.Tuple3; import java.io.File; import java.io.IOException; @@ -96,8 +99,6 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; -import scala.Tuple3; - import static org.apache.hudi.common.testutils.HoodieTestTable.makeIncrementalCommitTimes; import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS; @@ -687,6 +688,107 @@ public class TestCleaner extends HoodieClientTestBase { assertTrue(testTable.baseFileExists(p0, "002", file1P0)); assertTrue(testTable.logFileExists(p0, "002", file1P0, 4)); } + + @Test + public void testCleanWithReplaceCommits() throws Exception { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withAssumeDatePartitioning(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) + .build(); + + HoodieTestTable testTable = HoodieTestTable.of(metaClient); + String p0 = "2020/01/01"; + String p1 = "2020/01/02"; + + // make 1 commit, with 1 file per partition + String file1P0C0 = UUID.randomUUID().toString(); + String file1P1C0 = UUID.randomUUID().toString(); + testTable.addInflightCommit("00000000000001").withBaseFilesInPartition(p0, file1P0C0).withBaseFilesInPartition(p1, file1P1C0); + + HoodieCommitMetadata commitMetadata = generateCommitMetadata( + Collections.unmodifiableMap(new HashMap<String, List<String>>() { + { + put(p0, CollectionUtils.createImmutableList(file1P0C0)); + put(p1, CollectionUtils.createImmutableList(file1P1C0)); + } + }) + ); + metaClient.getActiveTimeline().saveAsComplete( + new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "00000000000001"), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + + metaClient = HoodieTableMetaClient.reload(metaClient); + + List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config); + assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions and clean any files"); + assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); + + // make next replacecommit, with 1 clustering operation. logically delete p0. No change to p1 + Map<String, String> partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0); + String file2P0C1 = partitionAndFileId002.get(p0); + testTable.addReplaceCommit("00000000000002", generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1)); + + // run cleaner + List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config); + assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions and clean any files"); + assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); + + // make next replacecommit, with 1 clustering operation. Replace data in p1. No change to p0 + Map<String, String> partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1); + String file3P1C2 = partitionAndFileId003.get(p1); + testTable.addReplaceCommit("00000000000003", generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2)); + + // run cleaner + List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config); + assertEquals(0, hoodieCleanStatsThree.size(), "Must not scan any partitions and clean any files"); + assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2)); + assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); + + // make next replacecommit, with 1 clustering operation. Replace data in p0 again + Map<String, String> partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0); + String file4P0C3 = partitionAndFileId004.get(p0); + testTable.addReplaceCommit("00000000000004", generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3)); + + // run cleaner + List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config); + assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); + assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2)); + assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + //file1P1C0 still stays because its not replaced until 3 and its the only version available + assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); + + // make next replacecommit, with 1 clustering operation. Replace all data in p1. no new files created + Map<String, String> partitionAndFileId005 = testTable.forReplaceCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p1); + String file4P1C4 = partitionAndFileId005.get(p1); + testTable.addReplaceCommit("00000000000005", generateReplaceCommitMetadata(p1, file3P1C2, file4P1C4)); + + List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 2); + assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3)); + assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1)); + assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2)); + assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0)); + assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0)); + } + + private HoodieReplaceCommitMetadata generateReplaceCommitMetadata(String partition, String replacedFileId, String newFileId) { + HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); + replaceMetadata.addReplaceFileId(partition, replacedFileId); + replaceMetadata.setOperationType(WriteOperationType.CLUSTER); + if (!StringUtils.isNullOrEmpty(newFileId)) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPartitionPath(partition); + writeStat.setPath(newFileId); + writeStat.setFileId(newFileId); + replaceMetadata.addWriteStat(partition, writeStat); + } + return replaceMetadata; + } @Test public void testCleanMetadataUpgradeDowngrade() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java index 32e60c3..962d69d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java @@ -33,6 +33,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieInstantInfo; +import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; @@ -158,10 +159,14 @@ public class TimelineMetadataUtils { return deserializeAvroMetadata(bytes, HoodieSavepointMetadata.class); } - public static HoodieRequestedReplaceMetadata deserializeRequestedReplaceMetadta(byte[] bytes) throws IOException { + public static HoodieRequestedReplaceMetadata deserializeRequestedReplaceMetadata(byte[] bytes) throws IOException { return deserializeAvroMetadata(bytes, HoodieRequestedReplaceMetadata.class); } + public static HoodieReplaceCommitMetadata deserializeHoodieReplaceMetadata(byte[] bytes) throws IOException { + return deserializeAvroMetadata(bytes, HoodieReplaceCommitMetadata.class); + } + public static <T extends SpecificRecordBase> T deserializeAvroMetadata(byte[] bytes, Class<T> clazz) throws IOException { DatumReader<T> reader = new SpecificDatumReader<>(clazz); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 65e9231..3f45715 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -62,6 +62,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS; import static org.apache.hudi.common.table.timeline.HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS; @@ -691,6 +692,16 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV } @Override + public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) { + return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplacedBefore(fg.getFileGroupId(), maxCommitTime)); + } + + @Override + public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) { + return getAllFileGroupsIncludingReplaced(partitionPath).filter(fg -> isFileGroupReplaced(fg.getFileGroupId())); + } + + @Override public final Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendingClustering() { try { readLock.lock(); @@ -1041,6 +1052,15 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV return isFileGroupReplacedBeforeOrOn(fileGroupId, instants.stream().max(Comparator.naturalOrder()).get()); } + private boolean isFileGroupReplacedBefore(HoodieFileGroupId fileGroupId, String instant) { + Option<HoodieInstant> hoodieInstantOption = getReplaceInstant(fileGroupId); + if (!hoodieInstantOption.isPresent()) { + return false; + } + + return HoodieTimeline.compareTimestamps(instant, GREATER_THAN, hoodieInstantOption.get().getTimestamp()); + } + private boolean isFileGroupReplacedBeforeOrOn(HoodieFileGroupId fileGroupId, String instant) { Option<HoodieInstant> hoodieInstantOption = getReplaceInstant(fileGroupId); if (!hoodieInstantOption.isPresent()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java index f7244ee..3783d00 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java @@ -200,6 +200,16 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri } @Override + public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) { + return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBefore, secondaryView::getReplacedFileGroupsBefore); + } + + @Override + public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) { + return execute(partitionPath, preferredView::getAllReplacedFileGroups, secondaryView::getAllReplacedFileGroups); + } + + @Override public Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations() { return execute(preferredView::getPendingCompactionOperations, secondaryView::getPendingCompactionOperations); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index 91a28a8..23b0536 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -91,6 +91,12 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON = String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/"); + public static final String ALL_REPLACED_FILEGROUPS_BEFORE = + String.format("%s/%s", BASE_URL, "filegroups/replaced/before/"); + + public static final String ALL_REPLACED_FILEGROUPS_PARTITION = + String.format("%s/%s", BASE_URL, "filegroups/replaced/partition/"); + public static final String PENDING_CLUSTERING_FILEGROUPS = String.format("%s/%s", BASE_URL, "clustering/pending/"); @@ -380,6 +386,30 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, } } + @Override + public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) { + Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime); + try { + List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap, + new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET); + return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient)); + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + + @Override + public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) { + Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath); + try { + List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_PARTITION, paramsMap, + new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET); + return fileGroups.stream().map(dto -> FileGroupDTO.toFileGroup(dto, metaClient)); + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + public boolean refresh() { Map<String, String> paramsMap = getParams(); try { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java index 504f95a..7330286 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java @@ -167,11 +167,21 @@ public interface TableFileSystemView { HoodieTimeline getTimeline(); /** - * Stream all the replaced file groups before maxCommitTime. + * Stream all the replaced file groups before or on maxCommitTime for given partition. */ Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath); /** + * Stream all the replaced file groups before maxCommitTime for given partition. + */ + Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath); + + /** + * Stream all the replaced file groups for given partition. + */ + Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath); + + /** * Filegroups that are in pending clustering. */ Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendingClustering(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index fcc3274..70dfa2a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -86,7 +86,7 @@ public class ClusteringUtils { LOG.warn("No content found in requested file for instant " + pendingReplaceInstant); return Option.empty(); } - HoodieRequestedReplaceMetadata requestedReplaceMetadata = TimelineMetadataUtils.deserializeRequestedReplaceMetadta(content.get()); + HoodieRequestedReplaceMetadata requestedReplaceMetadata = TimelineMetadataUtils.deserializeRequestedReplaceMetadata(content.get()); if (WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.getOperationType())) { return Option.of(Pair.of(pendingReplaceInstant, requestedReplaceMetadata.getClusteringPlan())); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 115001a..ed2a878 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -25,6 +25,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -92,6 +93,12 @@ public class HoodieTableMetadataUtil { case HoodieTimeline.SAVEPOINT_ACTION: // Nothing to be done here break; + case HoodieTimeline.REPLACE_COMMIT_ACTION: + HoodieReplaceCommitMetadata replaceMetadata = HoodieReplaceCommitMetadata.fromBytes( + timeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class); + // Note: we only add new files created here. Replaced files are removed from metadata later by cleaner. + records = Option.of(convertMetadataToRecords(replaceMetadata, instant.getTimestamp())); + break; default: throw new HoodieException("Unknown type of action " + instant.getAction()); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index 3fceee3..e103427 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -1356,6 +1356,13 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness { List<HoodieFileGroup> allReplaced = fsView.getReplacedFileGroupsBeforeOrOn("2", partitionPath1).collect(Collectors.toList()); assertEquals(1, allReplaced.size()); assertEquals(fileId1, allReplaced.get(0).getFileGroupId().getFileId()); + + allReplaced = fsView.getReplacedFileGroupsBefore("2", partitionPath1).collect(Collectors.toList()); + assertEquals(0, allReplaced.size()); + + allReplaced = fsView.getAllReplacedFileGroups(partitionPath1).collect(Collectors.toList()); + assertEquals(1, allReplaced.size()); + assertEquals(fileId1, allReplaced.get(0).getFileGroupId().getFileId()); } @Test diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 3663917..858e113 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -228,6 +228,11 @@ public class HoodieTestTable { return this; } + public HoodieTestTable forReplaceCommit(String instantTime) { + currentInstantTime = instantTime; + return this; + } + public HoodieTestTable forCompaction(String instantTime) { currentInstantTime = instantTime; return this; diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java index e008fc5..b3e860a 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java @@ -299,6 +299,21 @@ public class FileSystemViewHandler { writeValueAsString(ctx, dtos); }, true)); + app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE, new ViewHandler(ctx -> { + List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBefore( + ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), + ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""), + ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); + writeValueAsString(ctx, dtos); + }, true)); + + app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION, new ViewHandler(ctx -> { + List<FileGroupDTO> dtos = sliceHandler.getAllReplacedFileGroups( + ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), + ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); + writeValueAsString(ctx, dtos); + }, true)); + app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new ViewHandler(ctx -> { List<ClusteringOpDTO> dtos = sliceHandler.getFileGroupsInPendingClustering( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow()); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java index 18c5eb1..2180e4e 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java @@ -94,6 +94,16 @@ public class FileSliceHandler extends Handler { .collect(Collectors.toList()); } + public List<FileGroupDTO> getReplacedFileGroupsBefore(String basePath, String maxCommitTime, String partitionPath) { + return viewManager.getFileSystemView(basePath).getReplacedFileGroupsBefore(maxCommitTime, partitionPath).map(FileGroupDTO::fromFileGroup) + .collect(Collectors.toList()); + } + + public List<FileGroupDTO> getAllReplacedFileGroups(String basePath, String partitionPath) { + return viewManager.getFileSystemView(basePath).getAllReplacedFileGroups(partitionPath).map(FileGroupDTO::fromFileGroup) + .collect(Collectors.toList()); + } + public List<ClusteringOpDTO> getFileGroupsInPendingClustering(String basePath) { return viewManager.getFileSystemView(basePath).getFileGroupsInPendingClustering() .map(fgInstant -> ClusteringOpDTO.fromClusteringOp(fgInstant.getLeft(), fgInstant.getRight()))