This is an automated email from the ASF dual-hosted git repository. nagarwal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 4de0fcf [HUDI-566] Added new test cases for class HoodieTimeline, HoodieDefaultTimeline and HoodieActiveTimeline. 4de0fcf is described below commit 4de0fcfcb54ac76ed3b6852917588c32fec9bea8 Author: Prashant Wason <pwa...@uber.com> AuthorDate: Mon Jan 27 15:38:33 2020 -0800 [HUDI-566] Added new test cases for class HoodieTimeline, HoodieDefaultTimeline and HoodieActiveTimeline. --- .../apache/hudi/common/table/HoodieTimeline.java | 4 + .../table/string/TestHoodieActiveTimeline.java | 276 ++++++++++++++++++++- 2 files changed, 279 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java old mode 100644 new mode 100755 index a964411..015a497 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java @@ -56,6 +56,10 @@ public interface HoodieTimeline extends Serializable { String REQUESTED_EXTENSION = ".requested"; String RESTORE_ACTION = "restore"; + String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION, + CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION, + COMPACTION_ACTION}; + String COMMIT_EXTENSION = "." + COMMIT_ACTION; String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION; String CLEAN_EXTENSION = "." + CLEAN_ACTION; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java old mode 100644 new mode 100755 index 55a91cf..a9f027e --- a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.table.string; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.HoodieCommonTestHarness; import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.model.TimelineLayoutVersion; @@ -29,12 +28,22 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.Option; +import com.google.common.collect.Sets; +import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.model.TimelineLayoutVersion.VERSION_0; @@ -164,4 +173,269 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { assertFalse("", activeCommitTimeline.isBeforeTimelineStarts("02")); assertTrue("", activeCommitTimeline.isBeforeTimelineStarts("00")); } + + @Test + public void testTimelineGetOperations() { + List<HoodieInstant> allInstants = getAllInstants(); + Supplier<Stream<HoodieInstant>> sup = () -> allInstants.stream(); + timeline = new HoodieActiveTimeline(metaClient, true); + timeline.setInstants(allInstants); + + /** + * Helper function to check HoodieTimeline only contains some type of Instant actions. + * @param timeline The HoodieTimeline to check + * @param actions The actions that should be present in the timeline being checked + */ + BiConsumer<HoodieTimeline, Set<String>> checkTimeline = (HoodieTimeline timeline, Set<String> actions) -> { + sup.get().filter(i -> actions.contains(i.getAction())).forEach(i -> assertTrue(timeline.containsInstant(i))); + sup.get().filter(i -> !actions.contains(i.getAction())).forEach(i -> assertFalse(timeline.containsInstant(i))); + }; + + // Test that various types of getXXX operations from HoodieActiveTimeline + // return the correct set of Instant + checkTimeline.accept(timeline.getCommitsTimeline(), + Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION)); + checkTimeline.accept(timeline.getCommitsAndCompactionTimeline(), + Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION)); + checkTimeline.accept(timeline.getCommitTimeline(), Sets.newHashSet(HoodieTimeline.COMMIT_ACTION)); + + checkTimeline.accept(timeline.getDeltaCommitTimeline(), Sets.newHashSet(HoodieTimeline.DELTA_COMMIT_ACTION)); + checkTimeline.accept(timeline.getCleanerTimeline(), Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)); + checkTimeline.accept(timeline.getRollbackTimeline(), Sets.newHashSet(HoodieTimeline.ROLLBACK_ACTION)); + checkTimeline.accept(timeline.getRestoreTimeline(), Sets.newHashSet(HoodieTimeline.RESTORE_ACTION)); + checkTimeline.accept(timeline.getSavePointTimeline(), Sets.newHashSet(HoodieTimeline.SAVEPOINT_ACTION)); + checkTimeline.accept(timeline.getAllCommitsTimeline(), + Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, + HoodieTimeline.CLEAN_ACTION, HoodieTimeline.COMPACTION_ACTION, + HoodieTimeline.SAVEPOINT_ACTION, HoodieTimeline.ROLLBACK_ACTION)); + + // Get some random Instants + Random rand = new Random(); + Set<String> randomInstants = sup.get().filter(i -> rand.nextBoolean()) + .map(i -> i.getAction()) + .collect(Collectors.toSet()); + checkTimeline.accept(timeline.getTimelineOfActions(randomInstants), randomInstants); + } + + @Test + public void testTimelineInstantOperations() { + timeline = new HoodieActiveTimeline(metaClient, true); + assertEquals("No instant present", timeline.countInstants(), 0); + + // revertToInflight + HoodieInstant commit = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "1"); + timeline.createNewInstant(commit); + timeline = timeline.reload(); + assertEquals(timeline.countInstants(), 1); + assertTrue(timeline.containsInstant(commit)); + HoodieInstant inflight = timeline.revertToInflight(commit); + // revert creates the .requested file + timeline = timeline.reload(); + assertEquals(timeline.countInstants(), 1); + assertTrue(timeline.containsInstant(inflight)); + assertFalse(timeline.containsInstant(commit)); + + // deleteInflight + timeline.deleteInflight(inflight); + timeline = timeline.reload(); + assertEquals(timeline.countInstants(), 1); + assertFalse(timeline.containsInstant(inflight)); + assertFalse(timeline.containsInstant(commit)); + + // deletePending + timeline.createNewInstant(commit); + timeline.createNewInstant(inflight); + timeline = timeline.reload(); + assertEquals(timeline.countInstants(), 1); + timeline.deletePending(inflight); + timeline = timeline.reload(); + assertEquals(timeline.countInstants(), 1); + assertFalse(timeline.containsInstant(inflight)); + assertTrue(timeline.containsInstant(commit)); + + // deleteCompactionRequested + HoodieInstant compaction = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "2"); + timeline.createNewInstant(compaction); + timeline = timeline.reload(); + assertEquals(timeline.countInstants(), 2); + timeline.deleteCompactionRequested(compaction); + timeline = timeline.reload(); + assertEquals(timeline.countInstants(), 1); + assertFalse(timeline.containsInstant(inflight)); + assertFalse(timeline.containsInstant(compaction)); + assertTrue(timeline.containsInstant(commit)); + + // transitionCompactionXXXtoYYY and revertCompactionXXXtoYYY + compaction = new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "3"); + timeline.createNewInstant(compaction); + timeline = timeline.reload(); + assertTrue(timeline.containsInstant(compaction)); + inflight = timeline.transitionCompactionRequestedToInflight(compaction); + timeline = timeline.reload(); + assertFalse(timeline.containsInstant(compaction)); + assertTrue(timeline.containsInstant(inflight)); + compaction = timeline.revertCompactionInflightToRequested(inflight); + timeline = timeline.reload(); + assertTrue(timeline.containsInstant(compaction)); + assertFalse(timeline.containsInstant(inflight)); + inflight = timeline.transitionCompactionRequestedToInflight(compaction); + compaction = timeline.transitionCompactionInflightToComplete(inflight, Option.empty()); + timeline = timeline.reload(); + assertTrue(timeline.containsInstant(compaction)); + assertFalse(timeline.containsInstant(inflight)); + + // transitionCleanXXXtoYYY + HoodieInstant clean = new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, "4"); + timeline.saveToCleanRequested(clean, Option.empty()); + timeline = timeline.reload(); + assertTrue(timeline.containsInstant(clean)); + inflight = timeline.transitionCleanRequestedToInflight(clean, Option.empty()); + timeline = timeline.reload(); + assertFalse(timeline.containsInstant(clean)); + assertTrue(timeline.containsInstant(inflight)); + clean = timeline.transitionCleanInflightToComplete(inflight, Option.empty()); + timeline = timeline.reload(); + assertTrue(timeline.containsInstant(clean)); + assertFalse(timeline.containsInstant(inflight)); + + // Various states of Instants + HoodieInstant srcInstant = new HoodieInstant(State.COMPLETED, HoodieTimeline.RESTORE_ACTION, "5"); + HoodieInstant otherInstant = HoodieTimeline.getRequestedInstant(srcInstant); + assertEquals(otherInstant, new HoodieInstant(State.REQUESTED, HoodieTimeline.RESTORE_ACTION, "5")); + otherInstant = HoodieTimeline.getCleanRequestedInstant("5"); + assertEquals(otherInstant, new HoodieInstant(State.REQUESTED, HoodieTimeline.CLEAN_ACTION, "5")); + otherInstant = HoodieTimeline.getCleanInflightInstant("5"); + assertEquals(otherInstant, new HoodieInstant(State.INFLIGHT, HoodieTimeline.CLEAN_ACTION, "5")); + otherInstant = HoodieTimeline.getCompactionRequestedInstant("5"); + assertEquals(otherInstant, new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "5")); + otherInstant = HoodieTimeline.getCompactionInflightInstant("5"); + assertEquals(otherInstant, new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "5")); + + // containsOrBeforeTimelineStarts + List<HoodieInstant> allInstants = getAllInstants(); + timeline = new HoodieActiveTimeline(metaClient, true); + timeline.setInstants(allInstants); + + timeline.setInstants(allInstants); + timeline.createNewInstant(new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "2")); + allInstants.stream().map(i -> i.getTimestamp()).forEach(s -> assertTrue(timeline.containsOrBeforeTimelineStarts(s))); + assertTrue(timeline.containsOrBeforeTimelineStarts("0")); + assertFalse(timeline.containsOrBeforeTimelineStarts(String.valueOf(System.currentTimeMillis() + 1000))); + assertFalse(timeline.getTimelineHash().isEmpty()); + } + + @Test + public void testCreateInstants() { + List<HoodieInstant> allInstants = getAllInstants(); + for (HoodieInstant instant : allInstants) { + timeline.createNewInstant(instant); + } + + timeline = timeline.reload(); + for (HoodieInstant instant : allInstants) { + assertTrue(timeline.containsInstant(instant)); + } + } + + @Test + public void testInstantFilenameOperations() { + HoodieInstant instantRequested = new HoodieInstant(State.REQUESTED, HoodieTimeline.RESTORE_ACTION, "5"); + HoodieInstant instantInflight = new HoodieInstant(State.INFLIGHT, HoodieTimeline.RESTORE_ACTION, "5"); + HoodieInstant instantComplete = new HoodieInstant(State.COMPLETED, HoodieTimeline.RESTORE_ACTION, "5"); + assertEquals(HoodieTimeline.getCommitFromCommitFile(instantRequested.getFileName()), "5"); + assertEquals(HoodieTimeline.getCommitFromCommitFile(instantInflight.getFileName()), "5"); + assertEquals(HoodieTimeline.getCommitFromCommitFile(instantComplete.getFileName()), "5"); + + assertEquals(HoodieTimeline.makeFileNameAsComplete(instantInflight.getFileName()), + instantComplete.getFileName()); + + assertEquals(HoodieTimeline.makeFileNameAsInflight(instantComplete.getFileName()), + instantInflight.getFileName()); + } + + @Test + public void testFiltering() { + List<HoodieInstant> allInstants = getAllInstants(); + Supplier<Stream<HoodieInstant>> sup = () -> allInstants.stream(); + + timeline = new HoodieActiveTimeline(metaClient); + timeline.setInstants(allInstants); + + // getReverseOrderedInstants + Stream<HoodieInstant> instants = timeline.getReverseOrderedInstants(); + List<HoodieInstant> v1 = instants.collect(Collectors.toList()); + List<HoodieInstant> v2 = sup.get().collect(Collectors.toList()); + Collections.reverse(v2); + assertEquals(v1, v2); + + /** + * Helper function to check HoodieTimeline only contains some type of Instant states. + * @param timeline The HoodieTimeline to check + * @param states The states that should be present in the timeline being checked + */ + BiConsumer<HoodieTimeline, Set<State>> checkFilter = (HoodieTimeline timeline, Set<State> states) -> { + sup.get().filter(i -> states.contains(i.getState())).forEach(i -> assertTrue(timeline.containsInstant(i))); + sup.get().filter(i -> !states.contains(i.getState())).forEach(i -> assertFalse(timeline.containsInstant(i))); + }; + + checkFilter.accept(timeline.filter(i -> false), Sets.newHashSet()); + checkFilter.accept(timeline.filterInflights(), Sets.newHashSet(State.INFLIGHT)); + checkFilter.accept(timeline.filterInflightsAndRequested(), + Sets.newHashSet(State.INFLIGHT, State.REQUESTED)); + + // filterCompletedAndCompactionInstants + // This cannot be done using checkFilter as it involves both states and actions + final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants(); + final Set<State> states = Sets.newHashSet(State.REQUESTED, State.COMPLETED); + final Set<String> actions = Sets.newHashSet(HoodieTimeline.COMPACTION_ACTION); + sup.get().filter(i -> states.contains(i.getState()) || actions.contains(i.getAction())) + .forEach(i -> assertTrue(t1.containsInstant(i))); + sup.get().filter(i -> !(states.contains(i.getState()) || actions.contains(i.getAction()))) + .forEach(i -> assertFalse(t1.containsInstant(i))); + + // filterPendingCompactionTimeline + final HoodieTimeline t2 = timeline.filterPendingCompactionTimeline(); + sup.get().filter(i -> i.getAction() == HoodieTimeline.COMPACTION_ACTION) + .forEach(i -> assertTrue(t2.containsInstant(i))); + sup.get().filter(i -> i.getAction() != HoodieTimeline.COMPACTION_ACTION) + .forEach(i -> assertFalse(t2.containsInstant(i))); + } + + /** + * Returns an exhaustive list of all possible HoodieInstant. + * @return list of HoodieInstant + */ + private List<HoodieInstant> getAllInstants() { + timeline = new HoodieActiveTimeline(metaClient); + List<HoodieInstant> allInstants = new ArrayList<HoodieInstant>(); + long commitTime = 1; + for (State state : State.values()) { + if (state == State.INVALID) { + continue; + } + for (String action : HoodieTimeline.VALID_ACTIONS_IN_TIMELINE) { + // Following are not valid combinations of actions and state so we should + // not be generating them. + if (state == State.REQUESTED) { + if (action == HoodieTimeline.SAVEPOINT_ACTION || action == HoodieTimeline.RESTORE_ACTION + || action == HoodieTimeline.ROLLBACK_ACTION) { + continue; + } + } + if (state == State.INFLIGHT && action == HoodieTimeline.ROLLBACK_ACTION) { + continue; + } + if (state == State.COMPLETED && action == HoodieTimeline.ROLLBACK_ACTION) { + continue; + } + // Compaction complete is called commit complete + if (state == State.COMPLETED && action == HoodieTimeline.COMPACTION_ACTION) { + action = HoodieTimeline.COMMIT_ACTION; + } + + allInstants.add(new HoodieInstant(state, action, String.format("%03d", commitTime++))); + } + } + return allInstants; + } }