yihua commented on a change in pull request #4974: URL: https://github.com/apache/hudi/pull/4974#discussion_r828554793
########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java ########## @@ -946,6 +961,152 @@ public void testArchiveCommitsWithCompactionCommitInMetadataTableTimeline() thro } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean enableMetadata) throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig( + enableMetadata, 2, 4, 8, 1, HoodieTableType.MERGE_ON_READ); + + // When max archival commits is set to 4, even after 8 delta commits, since the number of delta + // commits is still smaller than 8, the archival should not kick in. + // The archival should only kick in after the 9th delta commit + // instant "00000001" to "00000009" + for (int i = 1; i < 10; i++) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 + ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + // archival + Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig); + List<HoodieInstant> originalCommits = commitsList.getKey(); + List<HoodieInstant> commitsAfterArchival = commitsList.getValue(); + + if (i <= 8) { + assertEquals(originalCommits, commitsAfterArchival); + } else { + assertEquals(1, originalCommits.size() - commitsAfterArchival.size()); + assertFalse(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000001"))); + IntStream.range(2, 10).forEach(j -> + assertTrue(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)))); + } + } + + testTable.doCompaction("00000010", Arrays.asList("p1", "p2")); + + // instant "00000011" to "00000019" + for (int i = 1; i < 10; i++) { + testTable.doWriteOperation("0000001" + i, WriteOperationType.UPSERT, i == 1 + ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + // archival + Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig); + List<HoodieInstant> originalCommits = commitsList.getKey(); + List<HoodieInstant> commitsAfterArchival = commitsList.getValue(); + + // first 9 delta commits before the completed compaction should be archived + IntStream.range(1, 10).forEach(j -> Review comment: The intention is to check the instants in the timeline for every commit so that they are intact. ########## File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java ########## @@ -946,6 +961,152 @@ public void testArchiveCommitsWithCompactionCommitInMetadataTableTimeline() thro } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean enableMetadata) throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig( + enableMetadata, 2, 4, 8, 1, HoodieTableType.MERGE_ON_READ); + + // When max archival commits is set to 4, even after 8 delta commits, since the number of delta + // commits is still smaller than 8, the archival should not kick in. + // The archival should only kick in after the 9th delta commit + // instant "00000001" to "00000009" + for (int i = 1; i < 10; i++) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 + ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + // archival + Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig); + List<HoodieInstant> originalCommits = commitsList.getKey(); + List<HoodieInstant> commitsAfterArchival = commitsList.getValue(); + + if (i <= 8) { + assertEquals(originalCommits, commitsAfterArchival); + } else { + assertEquals(1, originalCommits.size() - commitsAfterArchival.size()); + assertFalse(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000001"))); + IntStream.range(2, 10).forEach(j -> + assertTrue(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)))); + } + } + + testTable.doCompaction("00000010", Arrays.asList("p1", "p2")); + + // instant "00000011" to "00000019" + for (int i = 1; i < 10; i++) { + testTable.doWriteOperation("0000001" + i, WriteOperationType.UPSERT, i == 1 + ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + // archival + Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig); + List<HoodieInstant> originalCommits = commitsList.getKey(); + List<HoodieInstant> commitsAfterArchival = commitsList.getValue(); + + // first 9 delta commits before the completed compaction should be archived + IntStream.range(1, 10).forEach(j -> + assertFalse(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + j)))); + + if (i == 1) { + assertEquals(8, originalCommits.size() - commitsAfterArchival.size()); + // instant from "00000011" should be in the active timeline + assertTrue(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000010"))); + assertTrue(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "00000011"))); + } else if (i < 8) { + assertEquals(originalCommits, commitsAfterArchival); + } else { + assertEquals(1, originalCommits.size() - commitsAfterArchival.size()); + assertFalse(commitsAfterArchival.contains( + new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "00000010"))); + // i == 8 -> ["00000011", "00000018"] should be in the active timeline Review comment: Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org