This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4de0fcf  [HUDI-566] Added new test cases for class HoodieTimeline, 
HoodieDefaultTimeline and HoodieActiveTimeline.
4de0fcf is described below

commit 4de0fcfcb54ac76ed3b6852917588c32fec9bea8
Author: Prashant Wason <pwa...@uber.com>
AuthorDate: Mon Jan 27 15:38:33 2020 -0800

    [HUDI-566] Added new test cases for class HoodieTimeline, 
HoodieDefaultTimeline and HoodieActiveTimeline.
---
 .../apache/hudi/common/table/HoodieTimeline.java   |   4 +
 .../table/string/TestHoodieActiveTimeline.java     | 276 ++++++++++++++++++++-
 2 files changed, 279 insertions(+), 1 deletion(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
old mode 100644
new mode 100755
index a964411..015a497
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
@@ -56,6 +56,10 @@ public interface HoodieTimeline extends Serializable {
   String REQUESTED_EXTENSION = ".requested";
   String RESTORE_ACTION = "restore";
 
+  String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION,
+      CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION,
+      COMPACTION_ACTION};
+
   String COMMIT_EXTENSION = "." + COMMIT_ACTION;
   String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION;
   String CLEAN_EXTENSION = "." + CLEAN_ACTION;
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
old mode 100644
new mode 100755
index 55a91cf..a9f027e
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.table.string;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.HoodieCommonTestHarness;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.model.TimelineLayoutVersion;
@@ -29,12 +28,22 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.util.Option;
 
+import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.Path;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.common.model.TimelineLayoutVersion.VERSION_0;
@@ -164,4 +173,269 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
     assertFalse("", activeCommitTimeline.isBeforeTimelineStarts("02"));
     assertTrue("", activeCommitTimeline.isBeforeTimelineStarts("00"));
   }
+
+  @Test
+  public void testTimelineGetOperations() {
+    List<HoodieInstant> allInstants = getAllInstants();
+    Supplier<Stream<HoodieInstant>> sup = () -> allInstants.stream();
+    timeline = new HoodieActiveTimeline(metaClient, true);
+    timeline.setInstants(allInstants);
+
+    /**
+     * Helper function to check HoodieTimeline only contains some type of 
Instant actions.
+     * @param timeline The HoodieTimeline to check
+     * @param actions The actions that should be present in the timeline being 
checked
+     */
+    BiConsumer<HoodieTimeline, Set<String>> checkTimeline = (HoodieTimeline 
timeline, Set<String> actions) -> {
+      sup.get().filter(i -> actions.contains(i.getAction())).forEach(i -> 
assertTrue(timeline.containsInstant(i)));
+      sup.get().filter(i -> !actions.contains(i.getAction())).forEach(i -> 
assertFalse(timeline.containsInstant(i)));
+    };
+
+    // Test that various types of getXXX operations from HoodieActiveTimeline
+    // return the correct set of Instant
+    checkTimeline.accept(timeline.getCommitsTimeline(),
+            Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, 
HoodieTimeline.DELTA_COMMIT_ACTION));
+    checkTimeline.accept(timeline.getCommitsAndCompactionTimeline(),
+            Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, 
HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION));
+    checkTimeline.accept(timeline.getCommitTimeline(), 
Sets.newHashSet(HoodieTimeline.COMMIT_ACTION));
+
+    checkTimeline.accept(timeline.getDeltaCommitTimeline(), 
Sets.newHashSet(HoodieTimeline.DELTA_COMMIT_ACTION));
+    checkTimeline.accept(timeline.getCleanerTimeline(), 
Sets.newHashSet(HoodieTimeline.CLEAN_ACTION));
+    checkTimeline.accept(timeline.getRollbackTimeline(), 
Sets.newHashSet(HoodieTimeline.ROLLBACK_ACTION));
+    checkTimeline.accept(timeline.getRestoreTimeline(), 
Sets.newHashSet(HoodieTimeline.RESTORE_ACTION));
+    checkTimeline.accept(timeline.getSavePointTimeline(), 
Sets.newHashSet(HoodieTimeline.SAVEPOINT_ACTION));
+    checkTimeline.accept(timeline.getAllCommitsTimeline(),
+            Sets.newHashSet(HoodieTimeline.COMMIT_ACTION, 
HoodieTimeline.DELTA_COMMIT_ACTION,
+                    HoodieTimeline.CLEAN_ACTION, 
HoodieTimeline.COMPACTION_ACTION,
+                    HoodieTimeline.SAVEPOINT_ACTION, 
HoodieTimeline.ROLLBACK_ACTION));
+
+    // Get some random Instants
+    Random rand = new Random();
+    Set<String> randomInstants = sup.get().filter(i -> rand.nextBoolean())
+                                          .map(i -> i.getAction())
+                                          .collect(Collectors.toSet());
+    checkTimeline.accept(timeline.getTimelineOfActions(randomInstants), 
randomInstants);
+  }
+
+  @Test
+  public void testTimelineInstantOperations() {
+    timeline = new HoodieActiveTimeline(metaClient, true);
+    assertEquals("No instant present", timeline.countInstants(), 0);
+
+    // revertToInflight
+    HoodieInstant commit = new HoodieInstant(State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "1");
+    timeline.createNewInstant(commit);
+    timeline = timeline.reload();
+    assertEquals(timeline.countInstants(), 1);
+    assertTrue(timeline.containsInstant(commit));
+    HoodieInstant inflight = timeline.revertToInflight(commit);
+    // revert creates the .requested file
+    timeline = timeline.reload();
+    assertEquals(timeline.countInstants(), 1);
+    assertTrue(timeline.containsInstant(inflight));
+    assertFalse(timeline.containsInstant(commit));
+
+    // deleteInflight
+    timeline.deleteInflight(inflight);
+    timeline = timeline.reload();
+    assertEquals(timeline.countInstants(), 1);
+    assertFalse(timeline.containsInstant(inflight));
+    assertFalse(timeline.containsInstant(commit));
+
+    // deletePending
+    timeline.createNewInstant(commit);
+    timeline.createNewInstant(inflight);
+    timeline = timeline.reload();
+    assertEquals(timeline.countInstants(), 1);
+    timeline.deletePending(inflight);
+    timeline = timeline.reload();
+    assertEquals(timeline.countInstants(), 1);
+    assertFalse(timeline.containsInstant(inflight));
+    assertTrue(timeline.containsInstant(commit));
+
+    // deleteCompactionRequested
+    HoodieInstant compaction = new HoodieInstant(State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, "2");
+    timeline.createNewInstant(compaction);
+    timeline = timeline.reload();
+    assertEquals(timeline.countInstants(), 2);
+    timeline.deleteCompactionRequested(compaction);
+    timeline = timeline.reload();
+    assertEquals(timeline.countInstants(), 1);
+    assertFalse(timeline.containsInstant(inflight));
+    assertFalse(timeline.containsInstant(compaction));
+    assertTrue(timeline.containsInstant(commit));
+
+    // transitionCompactionXXXtoYYY and revertCompactionXXXtoYYY
+    compaction = new HoodieInstant(State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, "3");
+    timeline.createNewInstant(compaction);
+    timeline = timeline.reload();
+    assertTrue(timeline.containsInstant(compaction));
+    inflight = timeline.transitionCompactionRequestedToInflight(compaction);
+    timeline = timeline.reload();
+    assertFalse(timeline.containsInstant(compaction));
+    assertTrue(timeline.containsInstant(inflight));
+    compaction = timeline.revertCompactionInflightToRequested(inflight);
+    timeline = timeline.reload();
+    assertTrue(timeline.containsInstant(compaction));
+    assertFalse(timeline.containsInstant(inflight));
+    inflight = timeline.transitionCompactionRequestedToInflight(compaction);
+    compaction = timeline.transitionCompactionInflightToComplete(inflight, 
Option.empty());
+    timeline = timeline.reload();
+    assertTrue(timeline.containsInstant(compaction));
+    assertFalse(timeline.containsInstant(inflight));
+
+    // transitionCleanXXXtoYYY
+    HoodieInstant clean = new HoodieInstant(State.REQUESTED, 
HoodieTimeline.CLEAN_ACTION, "4");
+    timeline.saveToCleanRequested(clean, Option.empty());
+    timeline = timeline.reload();
+    assertTrue(timeline.containsInstant(clean));
+    inflight = timeline.transitionCleanRequestedToInflight(clean, 
Option.empty());
+    timeline = timeline.reload();
+    assertFalse(timeline.containsInstant(clean));
+    assertTrue(timeline.containsInstant(inflight));
+    clean = timeline.transitionCleanInflightToComplete(inflight, 
Option.empty());
+    timeline = timeline.reload();
+    assertTrue(timeline.containsInstant(clean));
+    assertFalse(timeline.containsInstant(inflight));
+
+    // Various states of Instants
+    HoodieInstant srcInstant = new HoodieInstant(State.COMPLETED, 
HoodieTimeline.RESTORE_ACTION, "5");
+    HoodieInstant otherInstant = 
HoodieTimeline.getRequestedInstant(srcInstant);
+    assertEquals(otherInstant, new HoodieInstant(State.REQUESTED, 
HoodieTimeline.RESTORE_ACTION, "5"));
+    otherInstant = HoodieTimeline.getCleanRequestedInstant("5");
+    assertEquals(otherInstant, new HoodieInstant(State.REQUESTED, 
HoodieTimeline.CLEAN_ACTION, "5"));
+    otherInstant = HoodieTimeline.getCleanInflightInstant("5");
+    assertEquals(otherInstant, new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.CLEAN_ACTION, "5"));
+    otherInstant = HoodieTimeline.getCompactionRequestedInstant("5");
+    assertEquals(otherInstant, new HoodieInstant(State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, "5"));
+    otherInstant = HoodieTimeline.getCompactionInflightInstant("5");
+    assertEquals(otherInstant, new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.COMPACTION_ACTION, "5"));
+
+    // containsOrBeforeTimelineStarts
+    List<HoodieInstant> allInstants = getAllInstants();
+    timeline = new HoodieActiveTimeline(metaClient, true);
+    timeline.setInstants(allInstants);
+
+    timeline.setInstants(allInstants);
+    timeline.createNewInstant(new HoodieInstant(State.REQUESTED, 
HoodieTimeline.COMMIT_ACTION, "2"));
+    allInstants.stream().map(i -> i.getTimestamp()).forEach(s -> 
assertTrue(timeline.containsOrBeforeTimelineStarts(s)));
+    assertTrue(timeline.containsOrBeforeTimelineStarts("0"));
+    
assertFalse(timeline.containsOrBeforeTimelineStarts(String.valueOf(System.currentTimeMillis()
 + 1000)));
+    assertFalse(timeline.getTimelineHash().isEmpty());
+  }
+
+  @Test
+  public void testCreateInstants() {
+    List<HoodieInstant> allInstants = getAllInstants();
+    for (HoodieInstant instant : allInstants) {
+      timeline.createNewInstant(instant);
+    }
+
+    timeline = timeline.reload();
+    for (HoodieInstant instant : allInstants) {
+      assertTrue(timeline.containsInstant(instant));
+    }
+  }
+
+  @Test
+  public void testInstantFilenameOperations() {
+    HoodieInstant instantRequested = new HoodieInstant(State.REQUESTED, 
HoodieTimeline.RESTORE_ACTION, "5");
+    HoodieInstant instantInflight = new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.RESTORE_ACTION, "5");
+    HoodieInstant instantComplete = new HoodieInstant(State.COMPLETED, 
HoodieTimeline.RESTORE_ACTION, "5");
+    
assertEquals(HoodieTimeline.getCommitFromCommitFile(instantRequested.getFileName()),
 "5");
+    
assertEquals(HoodieTimeline.getCommitFromCommitFile(instantInflight.getFileName()),
 "5");
+    
assertEquals(HoodieTimeline.getCommitFromCommitFile(instantComplete.getFileName()),
 "5");
+
+    
assertEquals(HoodieTimeline.makeFileNameAsComplete(instantInflight.getFileName()),
+            instantComplete.getFileName());
+
+    
assertEquals(HoodieTimeline.makeFileNameAsInflight(instantComplete.getFileName()),
+            instantInflight.getFileName());
+  }
+
+  @Test
+  public void testFiltering() {
+    List<HoodieInstant> allInstants = getAllInstants();
+    Supplier<Stream<HoodieInstant>> sup = () -> allInstants.stream();
+
+    timeline = new HoodieActiveTimeline(metaClient);
+    timeline.setInstants(allInstants);
+
+    // getReverseOrderedInstants
+    Stream<HoodieInstant> instants = timeline.getReverseOrderedInstants();
+    List<HoodieInstant> v1 = instants.collect(Collectors.toList());
+    List<HoodieInstant> v2 = sup.get().collect(Collectors.toList());
+    Collections.reverse(v2);
+    assertEquals(v1, v2);
+
+    /**
+     * Helper function to check HoodieTimeline only contains some type of 
Instant states.
+     * @param timeline The HoodieTimeline to check
+     * @param states The states that should be present in the timeline being 
checked
+     */
+    BiConsumer<HoodieTimeline, Set<State>> checkFilter = (HoodieTimeline 
timeline, Set<State> states) -> {
+      sup.get().filter(i -> states.contains(i.getState())).forEach(i -> 
assertTrue(timeline.containsInstant(i)));
+      sup.get().filter(i -> !states.contains(i.getState())).forEach(i -> 
assertFalse(timeline.containsInstant(i)));
+    };
+
+    checkFilter.accept(timeline.filter(i -> false), Sets.newHashSet());
+    checkFilter.accept(timeline.filterInflights(), 
Sets.newHashSet(State.INFLIGHT));
+    checkFilter.accept(timeline.filterInflightsAndRequested(),
+            Sets.newHashSet(State.INFLIGHT, State.REQUESTED));
+
+    // filterCompletedAndCompactionInstants
+    // This cannot be done using checkFilter as it involves both states and 
actions
+    final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants();
+    final Set<State> states = Sets.newHashSet(State.REQUESTED, 
State.COMPLETED);
+    final Set<String> actions = 
Sets.newHashSet(HoodieTimeline.COMPACTION_ACTION);
+    sup.get().filter(i -> states.contains(i.getState()) || 
actions.contains(i.getAction()))
+        .forEach(i -> assertTrue(t1.containsInstant(i)));
+    sup.get().filter(i -> !(states.contains(i.getState()) || 
actions.contains(i.getAction())))
+        .forEach(i -> assertFalse(t1.containsInstant(i)));
+
+    // filterPendingCompactionTimeline
+    final HoodieTimeline t2 = timeline.filterPendingCompactionTimeline();
+    sup.get().filter(i -> i.getAction() == HoodieTimeline.COMPACTION_ACTION)
+        .forEach(i -> assertTrue(t2.containsInstant(i)));
+    sup.get().filter(i -> i.getAction() != HoodieTimeline.COMPACTION_ACTION)
+        .forEach(i -> assertFalse(t2.containsInstant(i)));
+  }
+
+  /**
+   * Returns an exhaustive list of all possible HoodieInstant.
+   * @return list of HoodieInstant
+   */
+  private List<HoodieInstant> getAllInstants() {
+    timeline = new HoodieActiveTimeline(metaClient);
+    List<HoodieInstant> allInstants = new ArrayList<HoodieInstant>();
+    long commitTime = 1;
+    for (State state : State.values()) {
+      if (state == State.INVALID) {
+        continue;
+      }
+      for (String action : HoodieTimeline.VALID_ACTIONS_IN_TIMELINE) {
+        // Following are not valid combinations of actions and state so we 
should
+        // not be generating them.
+        if (state == State.REQUESTED) {
+          if (action == HoodieTimeline.SAVEPOINT_ACTION || action == 
HoodieTimeline.RESTORE_ACTION
+              || action == HoodieTimeline.ROLLBACK_ACTION) {
+            continue;
+          }
+        }
+        if (state == State.INFLIGHT && action == 
HoodieTimeline.ROLLBACK_ACTION) {
+          continue;
+        }
+        if (state == State.COMPLETED && action == 
HoodieTimeline.ROLLBACK_ACTION) {
+          continue;
+        }
+        // Compaction complete is called commit complete
+        if (state == State.COMPLETED && action == 
HoodieTimeline.COMPACTION_ACTION) {
+          action = HoodieTimeline.COMMIT_ACTION;
+        }
+
+        allInstants.add(new HoodieInstant(state, action, String.format("%03d", 
commitTime++)));
+      }
+    }
+    return allInstants;
+  }
 }

Reply via email to