This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 462fd02  [HUDI-571] Add 'commits show archived' command to CLI
462fd02 is described below

commit 462fd025563b0ae8a4d4f28d366a9bbfca070d3f
Author: Satish Kotha <satishko...@uber.com>
AuthorDate: Wed Jan 22 13:50:34 2020 -0800

    [HUDI-571] Add 'commits show archived' command to CLI
---
 .../apache/hudi/cli/commands/CommitsCommand.java   | 105 +++++++++--
 .../apache/hudi/io/TestHoodieCommitArchiveLog.java |  73 ++++----
 .../apache/hudi/common/model/HoodieWriteStat.java  |   3 +-
 .../apache/hudi/common/table/HoodieTimeline.java   |   8 +
 .../table/timeline/HoodieActiveTimeline.java       |  89 ----------
 .../table/timeline/HoodieArchivedTimeline.java     | 192 ++++++++++++++++++---
 .../table/timeline/HoodieDefaultTimeline.java      |  81 ++++++++-
 .../common/table/TestHoodieTableMetaClient.java    |  35 ----
 8 files changed, 385 insertions(+), 201 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index c0f8ead..3a11e58 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -28,9 +28,12 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.NumericUtils;
 
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.spark.launcher.SparkLauncher;
 import org.springframework.shell.core.CommandMarker;
 import org.springframework.shell.core.annotation.CliCommand;
@@ -38,7 +41,10 @@ import org.springframework.shell.core.annotation.CliOption;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
+import java.time.ZonedDateTime;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,6 +57,49 @@ import java.util.stream.Collectors;
 @Component
 public class CommitsCommand implements CommandMarker {
 
+  private String printCommits(HoodieDefaultTimeline timeline,
+                              final Integer limit, final String sortByField,
+                              final boolean descending,
+                              final boolean headerOnly) throws IOException {
+    final List<Comparable[]> rows = new ArrayList<>();
+
+    final List<HoodieInstant> commits = 
timeline.getCommitsTimeline().filterCompletedInstants()
+            .getInstants().collect(Collectors.toList());
+    // timeline can be read from multiple files. So sort is needed instead of 
reversing the collection
+    Collections.sort(commits, HoodieInstant.COMPARATOR.reversed());
+
+    for (int i = 0; i < commits.size(); i++) {
+      final HoodieInstant commit = commits.get(i);
+      final HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(
+              timeline.getInstantDetails(commit).get(),
+              HoodieCommitMetadata.class);
+      rows.add(new Comparable[]{commit.getTimestamp(),
+              commitMetadata.fetchTotalBytesWritten(),
+              commitMetadata.fetchTotalFilesInsert(),
+              commitMetadata.fetchTotalFilesUpdated(),
+              commitMetadata.fetchTotalPartitionsWritten(),
+              commitMetadata.fetchTotalRecordsWritten(),
+              commitMetadata.fetchTotalUpdateRecordsWritten(),
+              commitMetadata.fetchTotalWriteErrors()});
+    }
+
+    final Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
+    fieldNameToConverterMap.put("Total Bytes Written", entry -> {
+      return 
NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString())));
+    });
+
+    final TableHeader header = new TableHeader()
+            .addTableHeaderField("CommitTime")
+            .addTableHeaderField("Total Bytes Written")
+            .addTableHeaderField("Total Files Added")
+            .addTableHeaderField("Total Files Updated")
+            .addTableHeaderField("Total Partitions Written")
+            .addTableHeaderField("Total Records Written")
+            .addTableHeaderField("Total Update Records Written")
+            .addTableHeaderField("Total Errors");
+    return HoodiePrintHelper.print(header, fieldNameToConverterMap, 
sortByField, descending, limit, headerOnly, rows);
+  }
+
   @CliCommand(value = "commits show", help = "Show the commits")
   public String showCommits(
       @CliOption(key = {"limit"}, help = "Limit commits",
@@ -62,26 +111,39 @@ public class CommitsCommand implements CommandMarker {
       throws IOException {
 
     HoodieActiveTimeline activeTimeline = 
HoodieCLI.getTableMetaClient().getActiveTimeline();
-    HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
-    List<HoodieInstant> commits = 
timeline.getReverseOrderedInstants().collect(Collectors.toList());
-    List<Comparable[]> rows = new ArrayList<>();
-    for (HoodieInstant commit : commits) {
-      HoodieCommitMetadata commitMetadata =
-          
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), 
HoodieCommitMetadata.class);
-      rows.add(new Comparable[] {commit.getTimestamp(), 
commitMetadata.fetchTotalBytesWritten(),
-          commitMetadata.fetchTotalFilesInsert(), 
commitMetadata.fetchTotalFilesUpdated(),
-          commitMetadata.fetchTotalPartitionsWritten(), 
commitMetadata.fetchTotalRecordsWritten(),
-          commitMetadata.fetchTotalUpdateRecordsWritten(), 
commitMetadata.fetchTotalWriteErrors()});
-    }
-
-    Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
-    fieldNameToConverterMap.put("Total Bytes Written", entry -> 
NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString()))));
+    return printCommits(activeTimeline, limit, sortByField, descending, 
headerOnly);
+  }
 
-    TableHeader header = new 
TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total 
Bytes Written")
-        .addTableHeaderField("Total Files Added").addTableHeaderField("Total 
Files Updated")
-        .addTableHeaderField("Total Partitions 
Written").addTableHeaderField("Total Records Written")
-        .addTableHeaderField("Total Update Records 
Written").addTableHeaderField("Total Errors");
-    return HoodiePrintHelper.print(header, fieldNameToConverterMap, 
sortByField, descending, limit, headerOnly, rows);
+  @CliCommand(value = "commits show archived", help = "Show the archived 
commits")
+  public String showArchivedCommits(
+          @CliOption(key = {"startTs"},  mandatory = false, help = "start time 
for commits, default: now - 10 days")
+          String startTs,
+          @CliOption(key = {"endTs"},  mandatory = false, help = "end time for 
commits, default: now - 1 day")
+          String endTs,
+          @CliOption(key = {"limit"}, mandatory = false, help = "Limit 
commits", unspecifiedDefaultValue = "-1")
+          final Integer limit,
+          @CliOption(key = {"sortBy"}, help = "Sorting Field", 
unspecifiedDefaultValue = "")
+          final String sortByField,
+          @CliOption(key = {"desc"}, help = "Ordering", 
unspecifiedDefaultValue = "false")
+          final boolean descending,
+          @CliOption(key = {"headeronly"}, help = "Print Header Only", 
unspecifiedDefaultValue = "false")
+          final boolean headerOnly)
+          throws IOException {
+    if (StringUtils.isNullOrEmpty(startTs)) {
+      startTs = getTimeDaysAgo(10);
+    }
+    if (StringUtils.isNullOrEmpty(endTs)) {
+      endTs = getTimeDaysAgo(1);
+    }
+    HoodieArchivedTimeline archivedTimeline = 
HoodieCLI.getTableMetaClient().getArchivedTimeline();
+    try {
+      archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
+      return printCommits(archivedTimeline.findInstantsInRange(startTs, endTs),
+              limit, sortByField, descending, headerOnly);
+    } finally {
+      // clear the instant details from memory after printing to reduce usage
+      archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
+    }
   }
 
   @CliCommand(value = "commits refresh", help = "Refresh the commits")
@@ -241,4 +303,9 @@ public class CommitsCommand implements CommandMarker {
         + HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
   }
 
+  private String getTimeDaysAgo(int numberOfDays) {
+    Date date = 
Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
+    return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
+  }
+
 }
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java 
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
index c0fb1ad..eccbc7a 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
@@ -19,24 +19,18 @@
 package org.apache.hudi.io;
 
 import org.apache.hudi.HoodieClientTestHarness;
-import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
 import org.apache.hudi.common.HoodieTestDataGenerator;
-import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTimeline;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import com.google.common.collect.Sets;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.junit.After;
@@ -44,7 +38,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -197,35 +192,18 @@ public class TestHoodieCommitArchiveLog extends 
HoodieClientTestHarness {
         instants.contains(new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.COMPACTION_ACTION, "105")));
 
     // read the file
-    Reader reader =
-        HoodieLogFormat.newReader(dfs, new HoodieLogFile(new Path(basePath + 
"/.hoodie/.commits_.archive.1_1-0-1")),
-            HoodieArchivedMetaEntry.getClassSchema());
-    int archivedRecordsCount = 0;
-    List<IndexedRecord> readRecords = new ArrayList<>();
-    // read the avro blocks and validate the number of records written in each 
avro block
-    int numBlocks = 0;
-    while (reader.hasNext()) {
-      HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
-      List<IndexedRecord> records = blk.getRecords();
-      readRecords.addAll(records);
-      archivedRecordsCount += records.size();
-      numBlocks++;
-    }
-    System.out.println("Read Records :" + readRecords.stream().map(r -> 
(GenericRecord) r)
-        .map(r -> r.get("actionType") + "_" + r.get("actionState") + "_" + 
r.get("commitTime")).collect(Collectors.toList()));
-    assertEquals("Total archived records and total read records are the same 
count", 24, archivedRecordsCount);
-    assertTrue("Average Archived records per block is greater than 1", 
archivedRecordsCount / numBlocks > 1);
-    // make sure the archived commits are the same as the (originalcommits - 
commitsleft)
-    Set<String> readCommits = readRecords.stream().map(r -> (GenericRecord) 
r).map(r -> {
-      return r.get("commitTime").toString();
-    }).collect(Collectors.toSet());
+    HoodieArchivedTimeline archivedTimeline = new 
HoodieArchivedTimeline(metaClient);
+    assertEquals("Total archived records and total read records are the same 
count",
+            24, archivedTimeline.countInstants());
 
+    //make sure the archived commits are the same as the (originalcommits - 
commitsleft)
+    Set<String> readCommits =
+            
archivedTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
     assertEquals("Read commits map should match the originalCommits - 
commitsLoadedFromArchival",
-        
originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()),
 readCommits);
+            
originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()),
 readCommits);
 
     // verify in-flight instants after archive
     verifyInflightInstants(metaClient, 2);
-    reader.close();
   }
 
   @Test
@@ -397,6 +375,37 @@ public class TestHoodieCommitArchiveLog extends 
HoodieClientTestHarness {
         timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "107")));
   }
 
+  @Test
+  public void checkArchiveCommitTimeline() throws IOException, 
InterruptedException {
+    HoodieWriteConfig cfg =
+            
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+                    .withParallelism(2, 2).forTable("test-trip-table")
+                    
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
 3).build())
+                    .build();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, 
metaClient);
+
+    HoodieTestDataGenerator.createCommitFile(basePath, "1", dfs.getConf());
+    HoodieInstant instant1 = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "1");
+    HoodieTestDataGenerator.createCommitFile(basePath, "2", dfs.getConf());
+    HoodieInstant instant2 = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "2");
+    HoodieTestDataGenerator.createCommitFile(basePath, "3", dfs.getConf());
+    HoodieInstant instant3 = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "3");
+
+    //add 2 more instants to pass filter criteria set in compaction config 
above
+    HoodieTestDataGenerator.createCommitFile(basePath, "4", dfs.getConf());
+    HoodieInstant instant4 = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "4");
+    HoodieTestDataGenerator.createCommitFile(basePath, "5", dfs.getConf());
+    HoodieInstant instant5 = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "5");
+
+    boolean result = archiveLog.archiveIfRequired(jsc);
+    assertTrue(result);
+
+    HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
+    List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, 
instant3);
+    assertEquals(new HashSet(archivedInstants), 
archivedTimeline.getInstants().collect(Collectors.toSet()));
+  }
+
   private void verifyInflightInstants(HoodieTableMetaClient metaClient, int 
expectedTotalInstants) {
     HoodieTimeline timeline = metaClient.getActiveTimeline().reload()
         
.getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterInflights();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 0135dbe..97288df 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -135,6 +135,7 @@ public class HoodieWriteStat implements Serializable {
   /**
    * Total number of rollback blocks seen in a compaction operation.
    */
+  @Nullable
   private long totalRollbackBlocks;
 
   /**
@@ -290,7 +291,7 @@ public class HoodieWriteStat implements Serializable {
     return totalRollbackBlocks;
   }
 
-  public void setTotalRollbackBlocks(Long totalRollbackBlocks) {
+  public void setTotalRollbackBlocks(long totalRollbackBlocks) {
     this.totalRollbackBlocks = totalRollbackBlocks;
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
index 015a497..575a9ea 100755
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java
@@ -234,6 +234,14 @@ public interface HoodieTimeline extends Serializable {
     return predicateToApply.test(commit1, commit2);
   }
 
+  /**
+   * Return true if specified timestamp is in range (startTs, endTs].
+   */
+  static boolean isInRange(String timestamp, String startTs, String endTs) {
+    return HoodieTimeline.compareTimestamps(timestamp, startTs, GREATER)
+            && HoodieTimeline.compareTimestamps(timestamp, endTs, 
LESSER_OR_EQUAL);
+  }
+
   static HoodieInstant getCompletedInstant(final HoodieInstant instant) {
     return new HoodieInstant(State.COMPLETED, instant.getAction(), 
instant.getTimestamp());
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index f322d47..e5829f8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -27,7 +27,6 @@ import org.apache.hudi.exception.HoodieIOException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
@@ -45,7 +44,6 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * Represents the Active Timeline for the Hoodie table. Instants for the last 
12 hours (configurable) is in the
@@ -134,93 +132,6 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
     in.defaultReadObject();
   }
 
-  /**
-   * Get all instants (commits, delta commits) that produce new data, in the 
active timeline.
-   */
-  public HoodieTimeline getCommitsTimeline() {
-    return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION));
-  }
-
-  /**
-   * Get all instants (commits, delta commits, in-flight/request compaction) 
that produce new data, in the active
-   * timeline * With Async compaction a requested/inflight compaction-instant 
is a valid baseInstant for a file-slice as
-   * there could be delta-commits with that baseInstant.
-   */
-  @Override
-  public HoodieTimeline getCommitsAndCompactionTimeline() {
-    return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, COMPACTION_ACTION));
-  }
-
-  /**
-   * Get all instants (commits, delta commits, clean, savepoint, rollback) 
that result in actions, in the active
-   * timeline.
-   */
-  public HoodieTimeline getAllCommitsTimeline() {
-    return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, CLEAN_ACTION, COMPACTION_ACTION,
-        SAVEPOINT_ACTION, ROLLBACK_ACTION));
-  }
-
-  /**
-   * Get only pure commits (inflight and completed) in the active timeline.
-   */
-  public HoodieTimeline getCommitTimeline() {
-    return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION));
-  }
-
-  /**
-   * Get only the delta commits (inflight and completed) in the active 
timeline.
-   */
-  public HoodieTimeline getDeltaCommitTimeline() {
-    return new 
HoodieDefaultTimeline(filterInstantsByAction(DELTA_COMMIT_ACTION),
-        (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails);
-  }
-
-  /**
-   * Get a timeline of a specific set of actions. useful to create a merged 
timeline of multiple actions.
-   *
-   * @param actions actions allowed in the timeline
-   */
-  public HoodieTimeline getTimelineOfActions(Set<String> actions) {
-    return new HoodieDefaultTimeline(getInstants().filter(s -> 
actions.contains(s.getAction())),
-        (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails);
-  }
-
-  /**
-   * Get only the cleaner action (inflight and completed) in the active 
timeline.
-   */
-  public HoodieTimeline getCleanerTimeline() {
-    return new HoodieDefaultTimeline(filterInstantsByAction(CLEAN_ACTION),
-        (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails);
-  }
-
-  /**
-   * Get only the rollback action (inflight and completed) in the active 
timeline.
-   */
-  public HoodieTimeline getRollbackTimeline() {
-    return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION),
-        (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails);
-  }
-
-  /**
-   * Get only the save point action (inflight and completed) in the active 
timeline.
-   */
-  public HoodieTimeline getSavePointTimeline() {
-    return new HoodieDefaultTimeline(filterInstantsByAction(SAVEPOINT_ACTION),
-        (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails);
-  }
-
-  /**
-   * Get only the restore action (inflight and completed) in the active 
timeline.
-   */
-  public HoodieTimeline getRestoreTimeline() {
-    return new HoodieDefaultTimeline(filterInstantsByAction(RESTORE_ACTION),
-        (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails);
-  }
-
-  protected Stream<HoodieInstant> filterInstantsByAction(String action) {
-    return getInstants().filter(s -> s.getAction().equals(action));
-  }
-
   public void createNewInstant(HoodieInstant instant) {
     LOG.info("Creating a new instant " + instant);
     // Create the in-flight file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 4e45925..a2ad80c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -18,24 +18,36 @@
 
 package org.apache.hudi.common.table.timeline;
 
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTimeline;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Represents the Archived Timeline for the Hoodie table. Instants for the 
last 12 hours (configurable) is in the
@@ -49,34 +61,27 @@ import java.util.stream.Collectors;
  * This class can be serialized and de-serialized and on de-serialization the 
FileSystem is re-initialized.
  */
 public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
+  private static final Pattern ARCHIVE_FILE_PATTERN =
+          Pattern.compile("^\\.commits_\\.archive\\.([0-9]*)$");
 
-  private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE = "commits";
+  private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX = 
"commits";
+  private static final String ACTION_TYPE_KEY = "actionType";
   private HoodieTableMetaClient metaClient;
   private Map<String, byte[]> readCommits = new HashMap<>();
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieArchivedTimeline.class);
 
+  /**
+   * Loads instants between (startTs, endTs].
+   * Note that there is no lazy loading, so this may not work if really long 
time range (endTs-startTs) is specified.
+   * TBD: Should we enforce maximum time range?
+   */
   public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
-    // Read back the commits to make sure
-    Path archiveLogPath = 
HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
-    try (SequenceFile.Reader reader =
-        new SequenceFile.Reader(metaClient.getHadoopConf(), 
SequenceFile.Reader.file(archiveLogPath))) {
-      Text key = new Text();
-      Text val = new Text();
-      while (reader.next(key, val)) {
-        // TODO - limit the number of commits loaded in memory. this could get 
very large.
-        // This is okay because only tooling will load the archived commit 
timeline today
-        readCommits.put(key.toString(), Arrays.copyOf(val.getBytes(), 
val.getLength()));
-      }
-      this.setInstants(readCommits.keySet().stream().map(s -> new 
HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, s))
-          .collect(Collectors.toList()));
-    } catch (IOException e) {
-      throw new HoodieIOException("Could not load archived commit timeline 
from path " + archiveLogPath, e);
-    }
+    this.metaClient = metaClient;
+    setInstants(this.loadInstants(false));
     // multiple casts will make this lambda serializable -
     // http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
     this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails;
-    this.metaClient = metaClient;
   }
 
   /**
@@ -96,7 +101,16 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
   }
 
   public static Path getArchiveLogPath(String archiveFolder) {
-    return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE);
+    return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX);
+  }
+
+  public void loadInstantDetailsInMemory(String startTs, String endTs) {
+    loadInstants(startTs, endTs);
+  }
+
+  public void clearInstantDetailsFromMemory(String startTs, String endTs) {
+    this.findInstantsInRange(startTs, endTs).getInstants().forEach(instant ->
+            this.readCommits.remove(instant.getTimestamp()));
   }
 
   @Override
@@ -108,4 +122,136 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
     return new HoodieArchivedTimeline(metaClient);
   }
 
+  private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) {
+    final String commitTime  = 
record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
+    final String action = record.get(ACTION_TYPE_KEY).toString();
+    if (loadDetails) {
+      Option.ofNullable(record.get(getMetadataKey(action))).map(actionData ->
+              this.readCommits.put(commitTime, 
actionData.toString().getBytes(StandardCharsets.UTF_8))
+      );
+    }
+    return new HoodieInstant(false, action, commitTime);
+  }
+
+  private String getMetadataKey(String action) {
+    switch (action) {
+      case HoodieTimeline.CLEAN_ACTION:
+        return "hoodieCleanMetadata";
+      case HoodieTimeline.COMMIT_ACTION:
+        return "hoodieCommitMetadata";
+      case HoodieTimeline.DELTA_COMMIT_ACTION:
+        return "hoodieCommitMetadata";
+      case HoodieTimeline.ROLLBACK_ACTION:
+        return "hoodieRollbackMetadata";
+      case HoodieTimeline.SAVEPOINT_ACTION:
+        return "hoodieSavePointMetadata";
+      default:
+        throw new HoodieIOException("Unknown action in metadata " + action);
+    }
+  }
+
+  private List<HoodieInstant> loadInstants(boolean loadInstantDetails) {
+    return loadInstants(null, loadInstantDetails);
+  }
+
+  private List<HoodieInstant> loadInstants(String startTs, String endTs) {
+    return loadInstants(new TimeRangeFilter(startTs, endTs), true);
+  }
+
+  /**
+   * This is method to read selected instants. Do NOT use this directly use 
one of the helper methods above
+   * If loadInstantDetails is set to true, this would also update 
'readCommits' map with commit details
+   * If filter is specified, only the filtered instants are loaded
+   */
+  private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean 
loadInstantDetails) {
+    try {
+      // list all files
+      FileStatus[] fsStatuses = metaClient.getFs().globStatus(
+              new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
+
+      // sort files by version suffix in reverse (implies reverse 
chronological order)
+      Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
+
+      List<HoodieInstant> instantsInRange = new ArrayList<>();
+      for (FileStatus fs : fsStatuses) {
+        //read the archived file
+        HoodieLogFormat.Reader reader = 
HoodieLogFormat.newReader(metaClient.getFs(),
+                new HoodieLogFile(fs.getPath()), 
HoodieArchivedMetaEntry.getClassSchema());
+        try {
+          int instantsInPreviousFile = instantsInRange.size();
+          //read the avro blocks
+          while (reader.hasNext()) {
+            HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
+            // TODO If we can store additional metadata in datablock, we can 
skip parsing records
+            // (such as startTime, endTime of records in the block)
+            List<IndexedRecord> records = blk.getRecords();
+            // filter blocks in desired time window
+            Stream<HoodieInstant> instantsInBlkStream = records.stream()
+                    .map(r -> readCommit((GenericRecord) r, 
loadInstantDetails));
+
+            if (filter != null) {
+              instantsInBlkStream = 
instantsInBlkStream.filter(filter::isInRange);
+            }
+
+            
instantsInRange.addAll(instantsInBlkStream.collect(Collectors.toList()));
+          }
+
+          if (filter != null) {
+            int instantsInCurrentFile = instantsInRange.size() - 
instantsInPreviousFile;
+            if (instantsInPreviousFile > 0 && instantsInCurrentFile == 0) {
+              // Note that this is an optimization to skip reading unnecessary 
archived files
+              // This signals we crossed lower bound of desired time window.
+              break;
+            }
+          }
+        } finally {
+          reader.close();
+        }
+      }
+
+      return instantsInRange;
+    } catch (IOException e) {
+      throw new HoodieIOException(
+              "Could not load archived commit timeline from path " + 
metaClient.getArchivePath(), e);
+    }
+  }
+
+  private static class TimeRangeFilter {
+    private final String startTs;
+    private final String endTs;
+
+    public TimeRangeFilter(String startTs, String endTs) {
+      this.startTs = startTs;
+      this.endTs = endTs;
+    }
+
+    public boolean isInRange(HoodieInstant instant) {
+      return HoodieTimeline.isInRange(instant.getTimestamp(), this.startTs, 
this.endTs);
+    }
+  }
+
+  /**
+   * Sort files by reverse order of version suffix in file name.
+   */
+  public static class ArchiveFileVersionComparator implements 
Comparator<FileStatus>, Serializable {
+    @Override
+    public int compare(FileStatus f1, FileStatus f2) {
+      return Integer.compare(getArchivedFileSuffix(f2), 
getArchivedFileSuffix(f1));
+    }
+
+    private int getArchivedFileSuffix(FileStatus f) {
+      try {
+        Matcher fileMatcher = 
ARCHIVE_FILE_PATTERN.matcher(f.getPath().getName());
+        if (fileMatcher.matches()) {
+          return Integer.parseInt(fileMatcher.group(1));
+        }
+      } catch (NumberFormatException e) {
+        // log and ignore any format warnings
+        LOG.warn("error getting suffix for archived file: " + f.getPath());
+      }
+
+      // return default value in case of any errors
+      return 0;
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index 78d6c6f..9f06629 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.io.Serializable;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.List;
@@ -126,8 +127,7 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
   @Override
   public HoodieDefaultTimeline findInstantsInRange(String startTs, String 
endTs) {
     return new HoodieDefaultTimeline(
-        instants.stream().filter(s -> 
HoodieTimeline.compareTimestamps(s.getTimestamp(), startTs, GREATER)
-            && HoodieTimeline.compareTimestamps(s.getTimestamp(), endTs, 
LESSER_OR_EQUAL)),
+        instants.stream().filter(s -> 
HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)),
         details);
   }
 
@@ -143,6 +143,83 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
     return new HoodieDefaultTimeline(instants.stream().filter(filter), 
details);
   }
 
+  /**
+   * Get all instants (commits, delta commits) that produce new data, in the 
active timeline.
+   */
+  public HoodieTimeline getCommitsTimeline() {
+    return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION));
+  }
+
+  /**
+   * Get all instants (commits, delta commits, clean, savepoint, rollback) 
that result in actions, in the active
+   * timeline.
+   */
+  public HoodieTimeline getAllCommitsTimeline() {
+    return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, CLEAN_ACTION, COMPACTION_ACTION,
+            SAVEPOINT_ACTION, ROLLBACK_ACTION));
+  }
+
+  /**
+   * Get only pure commits (inflight and completed) in the active timeline.
+   */
+  public HoodieTimeline getCommitTimeline() {
+    return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION));
+  }
+
+  /**
+   * Get only the delta commits (inflight and completed) in the active 
timeline.
+   */
+  public HoodieTimeline getDeltaCommitTimeline() {
+    return new 
HoodieDefaultTimeline(filterInstantsByAction(DELTA_COMMIT_ACTION),
+            (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails);
+  }
+
+  /**
+   * Get a timeline of a specific set of actions. useful to create a merged 
timeline of multiple actions.
+   *
+   * @param actions actions allowed in the timeline
+   */
+  public HoodieTimeline getTimelineOfActions(Set<String> actions) {
+    return new HoodieDefaultTimeline(getInstants().filter(s -> 
actions.contains(s.getAction())),
+            (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails);
+  }
+
+  /**
+   * Get only the cleaner action (inflight and completed) in the active 
timeline.
+   */
+  public HoodieTimeline getCleanerTimeline() {
+    return new HoodieDefaultTimeline(filterInstantsByAction(CLEAN_ACTION),
+            (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails);
+  }
+
+  /**
+   * Get only the rollback action (inflight and completed) in the active 
timeline.
+   */
+  public HoodieTimeline getRollbackTimeline() {
+    return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION),
+            (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails);
+  }
+
+  /**
+   * Get only the save point action (inflight and completed) in the active 
timeline.
+   */
+  public HoodieTimeline getSavePointTimeline() {
+    return new HoodieDefaultTimeline(filterInstantsByAction(SAVEPOINT_ACTION),
+            (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails);
+  }
+
+  /**
+   * Get only the restore action (inflight and completed) in the active 
timeline.
+   */
+  public HoodieTimeline getRestoreTimeline() {
+    return new HoodieDefaultTimeline(filterInstantsByAction(RESTORE_ACTION),
+            (Function<HoodieInstant, Option<byte[]>> & Serializable) 
this::getInstantDetails);
+  }
+
+  protected Stream<HoodieInstant> filterInstantsByAction(String action) {
+    return getInstants().filter(s -> s.getAction().equals(action));
+  }
+
   @Override
   public boolean empty() {
     return !instants.stream().findFirst().isPresent();
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
index 6864623..8b9f643 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
@@ -21,20 +21,13 @@ package org.apache.hudi.common.table;
 import org.apache.hudi.common.HoodieCommonTestHarness;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -100,32 +93,4 @@ public class TestHoodieTableMetaClient extends 
HoodieCommonTestHarness {
     assertArrayEquals("Commit value should be \"test-detail\"", 
"test-detail".getBytes(),
         activeCommitTimeline.getInstantDetails(completedInstant).get());
   }
-
-  @Test
-  public void checkArchiveCommitTimeline() throws IOException {
-    Path archiveLogPath = 
HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
-    SequenceFile.Writer writer =
-        SequenceFile.createWriter(metaClient.getHadoopConf(), 
SequenceFile.Writer.file(archiveLogPath),
-            SequenceFile.Writer.keyClass(Text.class), 
SequenceFile.Writer.valueClass(Text.class));
-
-    writer.append(new Text("1"), new Text("data1"));
-    writer.append(new Text("2"), new Text("data2"));
-    writer.append(new Text("3"), new Text("data3"));
-
-    IOUtils.closeStream(writer);
-
-    HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
-
-    HoodieInstant instant1 = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "1");
-    HoodieInstant instant2 = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "2");
-    HoodieInstant instant3 = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "3");
-
-    assertEquals(Arrays.asList(instant1, instant2, instant3),
-        archivedTimeline.getInstants().collect(Collectors.toList()));
-
-    assertArrayEquals(new Text("data1").getBytes(), 
archivedTimeline.getInstantDetails(instant1).get());
-    assertArrayEquals(new Text("data2").getBytes(), 
archivedTimeline.getInstantDetails(instant2).get());
-    assertArrayEquals(new Text("data3").getBytes(), 
archivedTimeline.getInstantDetails(instant3).get());
-  }
-
 }

Reply via email to