This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 20ed251  [HUDI-571] Add show archived compaction(s) to CLI
20ed251 is described below

commit 20ed2516d38b9ce4b3e185bd89b62264b8bd3f25
Author: Satish Kotha <satishko...@uber.com>
AuthorDate: Wed Jan 22 13:50:34 2020 -0800

    [HUDI-571] Add show archived compaction(s) to CLI
---
 .../apache/hudi/cli/commands/CommitsCommand.java   |  13 +-
 .../hudi/cli/commands/CompactionCommand.java       | 247 ++++++++++++++++-----
 .../java/org/apache/hudi/cli/utils/CommitUtil.java |  23 ++
 3 files changed, 216 insertions(+), 67 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index 1e17c4c..804096b 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -21,6 +21,7 @@ package org.apache.hudi.cli.commands;
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.HoodiePrintHelper;
 import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.cli.utils.CommitUtil;
 import org.apache.hudi.cli.utils.InputStreamConsumer;
 import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -41,10 +42,8 @@ import org.springframework.shell.core.annotation.CliOption;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
-import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -186,10 +185,10 @@ public class CommitsCommand implements CommandMarker {
           final boolean headerOnly)
           throws IOException {
     if (StringUtils.isNullOrEmpty(startTs)) {
-      startTs = getTimeDaysAgo(10);
+      startTs = CommitUtil.getTimeDaysAgo(10);
     }
     if (StringUtils.isNullOrEmpty(endTs)) {
-      endTs = getTimeDaysAgo(1);
+      endTs = CommitUtil.getTimeDaysAgo(1);
     }
     HoodieArchivedTimeline archivedTimeline = 
HoodieCLI.getTableMetaClient().getArchivedTimeline();
     try {
@@ -362,10 +361,4 @@ public class CommitsCommand implements CommandMarker {
     return "Load sync state between " + 
HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
         + HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
   }
-
-  private String getTimeDaysAgo(int numberOfDays) {
-    Date date = 
Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
-    return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
-  }
-
 }
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index 0b57947..2564931 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -26,16 +26,20 @@ import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.HoodiePrintHelper;
 import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
+import org.apache.hudi.cli.utils.CommitUtil;
 import org.apache.hudi.cli.utils.InputStreamConsumer;
 import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.util.AvroUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.func.OperationResult;
@@ -61,8 +65,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * CLI command to display compaction related options.
@@ -95,51 +101,9 @@ public class CompactionCommand implements CommandMarker {
       throws IOException {
     HoodieTableMetaClient client = checkAndGetMetaClient();
     HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
-    HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionTimeline();
-    HoodieTimeline commitTimeline = 
activeTimeline.getCommitTimeline().filterCompletedInstants();
-    Set<String> committed = 
commitTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
-
-    List<HoodieInstant> instants = 
timeline.getReverseOrderedInstants().collect(Collectors.toList());
-    List<Comparable[]> rows = new ArrayList<>();
-    for (HoodieInstant instant : instants) {
-      HoodieCompactionPlan compactionPlan = null;
-      if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
-        try {
-          // This could be a completed compaction. Assume a compaction request 
file is present but skip if fails
-          compactionPlan = AvroUtils.deserializeCompactionPlan(
-              activeTimeline.readCompactionPlanAsBytes(
-                  
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
-        } catch (HoodieIOException ioe) {
-          // SKIP
-        }
-      } else {
-        compactionPlan = 
AvroUtils.deserializeCompactionPlan(activeTimeline.readCompactionPlanAsBytes(
-            
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
-      }
-
-      if (null != compactionPlan) {
-        State state = instant.getState();
-        if (committed.contains(instant.getTimestamp())) {
-          state = State.COMPLETED;
-        }
-        if (includeExtraMetadata) {
-          rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
-              compactionPlan.getOperations() == null ? 0 : 
compactionPlan.getOperations().size(),
-              compactionPlan.getExtraMetadata().toString()});
-        } else {
-          rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
-              compactionPlan.getOperations() == null ? 0 : 
compactionPlan.getOperations().size()});
-        }
-      }
-    }
-
-    Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
-    TableHeader header = new TableHeader().addTableHeaderField("Compaction 
Instant Time").addTableHeaderField("State")
-        .addTableHeaderField("Total FileIds to be Compacted");
-    if (includeExtraMetadata) {
-      header = header.addTableHeaderField("Extra Metadata");
-    }
-    return HoodiePrintHelper.print(header, fieldNameToConverterMap, 
sortByField, descending, limit, headerOnly, rows);
+    return printAllCompactions(activeTimeline,
+            compactionPlanReader(this::readCompactionPlanForActiveTimeline, 
activeTimeline),
+            includeExtraMetadata, sortByField, descending, limit, headerOnly);
   }
 
   @CliCommand(value = "compaction show", help = "Shows compaction details for 
a specific compaction instant")
@@ -159,19 +123,68 @@ public class CompactionCommand implements CommandMarker {
         activeTimeline.readCompactionPlanAsBytes(
             
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
 
-    List<Comparable[]> rows = new ArrayList<>();
-    if ((null != compactionPlan) && (null != compactionPlan.getOperations())) {
-      for (HoodieCompactionOperation op : compactionPlan.getOperations()) {
-        rows.add(new Comparable[] {op.getPartitionPath(), op.getFileId(), 
op.getBaseInstantTime(), op.getDataFilePath(),
-            op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : 
op.getMetrics().toString()});
-      }
+    return printCompaction(compactionPlan, sortByField, descending, limit, 
headerOnly);
+  }
+
+  @CliCommand(value = "compactions show archived", help = "Shows compaction 
details for specified time window")
+  public String compactionShowArchived(
+          @CliOption(key = {"includeExtraMetadata"}, help = "Include extra 
metadata",
+                  unspecifiedDefaultValue = "false") final boolean 
includeExtraMetadata,
+          @CliOption(key = {"startTs"},  mandatory = false, help = "start time 
for compactions, default: now - 10 days")
+                  String startTs,
+          @CliOption(key = {"endTs"},  mandatory = false, help = "end time for 
compactions, default: now - 1 day")
+                  String endTs,
+          @CliOption(key = {"limit"}, help = "Limit compactions",
+                  unspecifiedDefaultValue = "-1") final Integer limit,
+          @CliOption(key = {"sortBy"}, help = "Sorting Field", 
unspecifiedDefaultValue = "") final String sortByField,
+          @CliOption(key = {"desc"}, help = "Ordering", 
unspecifiedDefaultValue = "false") final boolean descending,
+          @CliOption(key = {"headeronly"}, help = "Print Header Only",
+                  unspecifiedDefaultValue = "false") final boolean headerOnly)
+          throws Exception {
+    if (StringUtils.isNullOrEmpty(startTs)) {
+      startTs = CommitUtil.getTimeDaysAgo(10);
+    }
+    if (StringUtils.isNullOrEmpty(endTs)) {
+      endTs = CommitUtil.getTimeDaysAgo(1);
     }
 
-    Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
-    TableHeader header = new TableHeader().addTableHeaderField("Partition 
Path").addTableHeaderField("File Id")
-        .addTableHeaderField("Base Instant").addTableHeaderField("Data File 
Path")
-        .addTableHeaderField("Total Delta 
Files").addTableHeaderField("getMetrics");
-    return HoodiePrintHelper.print(header, fieldNameToConverterMap, 
sortByField, descending, limit, headerOnly, rows);
+    HoodieTableMetaClient client = checkAndGetMetaClient();
+    HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
+    archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
+    try {
+      return printAllCompactions(archivedTimeline,
+              
compactionPlanReader(this::readCompactionPlanForArchivedTimeline, 
archivedTimeline),
+              includeExtraMetadata, sortByField, descending, limit, 
headerOnly);
+    } finally {
+      archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
+    }
+  }
+
+  @CliCommand(value = "compaction show archived", help = "Shows compaction 
details for a specific compaction instant")
+  public String compactionShowArchived(
+          @CliOption(key = "instant", mandatory = true,
+                  help = "instant time") final String compactionInstantTime,
+          @CliOption(key = {"limit"}, help = "Limit commits",
+                  unspecifiedDefaultValue = "-1") final Integer limit,
+          @CliOption(key = {"sortBy"}, help = "Sorting Field", 
unspecifiedDefaultValue = "") final String sortByField,
+          @CliOption(key = {"desc"}, help = "Ordering", 
unspecifiedDefaultValue = "false") final boolean descending,
+          @CliOption(key = {"headeronly"}, help = "Print Header Only",
+                  unspecifiedDefaultValue = "false") final boolean headerOnly)
+          throws Exception {
+    HoodieTableMetaClient client = checkAndGetMetaClient();
+    HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
+    HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED,
+            HoodieTimeline.COMPACTION_ACTION, compactionInstantTime);
+    String startTs = CommitUtil.addHours(compactionInstantTime, -1);
+    String endTs = CommitUtil.addHours(compactionInstantTime, 1);
+    try {
+      archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
+      HoodieCompactionPlan compactionPlan = 
AvroUtils.deserializeCompactionPlan(
+              archivedTimeline.getInstantDetails(instant).get());
+      return printCompaction(compactionPlan, sortByField, descending, limit, 
headerOnly);
+    } finally {
+      archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
+    }
   }
 
   @CliCommand(value = "compaction schedule", help = "Schedule Compaction")
@@ -249,6 +262,126 @@ public class CompactionCommand implements CommandMarker {
     return "Compaction successfully completed for " + compactionInstantTime;
   }
 
+  /**
+   * Prints all compaction details.
+   */
+  private String printAllCompactions(HoodieDefaultTimeline timeline,
+                                     Function<HoodieInstant, 
HoodieCompactionPlan> compactionPlanReader,
+                                     boolean includeExtraMetadata,
+                                     String sortByField,
+                                     boolean descending,
+                                     int limit,
+                                     boolean headerOnly) {
+
+    Stream<HoodieInstant> instantsStream = 
timeline.getCommitsAndCompactionTimeline().getReverseOrderedInstants();
+    List<Pair<HoodieInstant, HoodieCompactionPlan>> compactionPlans = 
instantsStream
+            .map(instant -> Pair.of(instant, 
compactionPlanReader.apply(instant)))
+            .filter(pair -> pair.getRight() != null)
+            .collect(Collectors.toList());
+
+    Set<HoodieInstant> committedInstants = 
timeline.getCommitTimeline().filterCompletedInstants()
+            .getInstants().collect(Collectors.toSet());
+
+    List<Comparable[]> rows = new ArrayList<>();
+    for (Pair<HoodieInstant, HoodieCompactionPlan> compactionPlan : 
compactionPlans) {
+      HoodieCompactionPlan plan = compactionPlan.getRight();
+      HoodieInstant instant = compactionPlan.getLeft();
+      final HoodieInstant.State state;
+      if (committedInstants.contains(instant)) {
+        state = HoodieInstant.State.COMPLETED;
+      } else {
+        state = instant.getState();
+      }
+
+      if (includeExtraMetadata) {
+        rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
+                plan.getOperations() == null ? 0 : plan.getOperations().size(),
+                plan.getExtraMetadata().toString()});
+      } else {
+        rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
+                plan.getOperations() == null ? 0 : 
plan.getOperations().size()});
+      }
+    }
+
+    Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
+    TableHeader header = new TableHeader().addTableHeaderField("Compaction 
Instant Time").addTableHeaderField("State")
+            .addTableHeaderField("Total FileIds to be Compacted");
+    if (includeExtraMetadata) {
+      header = header.addTableHeaderField("Extra Metadata");
+    }
+    return HoodiePrintHelper.print(header, fieldNameToConverterMap, 
sortByField, descending, limit, headerOnly, rows);
+  }
+
+  /**
+   * Compaction reading is different for different timelines. Create partial 
function to override special logic.
+   * We can make these read methods part of HoodieDefaultTimeline and override 
where necessary. But the
+   * BiFunction below has 'hacky' exception blocks, so restricting it to CLI.
+   */
+  private <T extends HoodieDefaultTimeline, U extends HoodieInstant, V extends 
HoodieCompactionPlan>
+      Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader(
+          BiFunction<T, HoodieInstant, HoodieCompactionPlan> f, T timeline) {
+
+    return (y) -> f.apply(timeline, y);
+  }
+
+  private HoodieCompactionPlan 
readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline,
+                                                                     
HoodieInstant instant) {
+    if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
+      return null;
+    } else {
+      try {
+        return 
AvroUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get());
+      } catch (IOException e) {
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+    }
+  }
+
+  /**
+   * TBD Can we make this part of HoodieActiveTimeline or a utility class.
+   */
+  private HoodieCompactionPlan 
readCompactionPlanForActiveTimeline(HoodieActiveTimeline activeTimeline,
+                                                                   
HoodieInstant instant) {
+    try {
+      if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
+        try {
+          // This could be a completed compaction. Assume a compaction request 
file is present but skip if fails
+          return AvroUtils.deserializeCompactionPlan(
+                  activeTimeline.readCompactionPlanAsBytes(
+                          
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
+        } catch (HoodieIOException ioe) {
+          // SKIP
+          return null;
+        }
+      } else {
+        return 
AvroUtils.deserializeCompactionPlan(activeTimeline.readCompactionPlanAsBytes(
+                
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException(e.getMessage(), e);
+    }
+  }
+
+  private String printCompaction(HoodieCompactionPlan compactionPlan,
+                                 String sortByField,
+                                 boolean descending,
+                                 int limit,
+                                 boolean headerOnly) {
+    List<Comparable[]> rows = new ArrayList<>();
+    if ((null != compactionPlan) && (null != compactionPlan.getOperations())) {
+      for (HoodieCompactionOperation op : compactionPlan.getOperations()) {
+        rows.add(new Comparable[]{op.getPartitionPath(), op.getFileId(), 
op.getBaseInstantTime(), op.getDataFilePath(),
+                op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" : 
op.getMetrics().toString()});
+      }
+    }
+
+    Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
+    TableHeader header = new TableHeader().addTableHeaderField("Partition 
Path").addTableHeaderField("File Id")
+            .addTableHeaderField("Base Instant").addTableHeaderField("Data 
File Path")
+            .addTableHeaderField("Total Delta 
Files").addTableHeaderField("getMetrics");
+    return HoodiePrintHelper.print(header, fieldNameToConverterMap, 
sortByField, descending, limit, headerOnly, rows);
+  }
+
   private static String getTmpSerializerFile() {
     return TMP_DIR + UUID.randomUUID().toString() + ".ser";
   }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
index aaaeb35..60d3e3e 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
@@ -21,9 +21,15 @@ package org.apache.hudi.cli.utils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 
 import java.io.IOException;
+import java.text.ParseException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Date;
 import java.util.List;
 
 /**
@@ -42,4 +48,21 @@ public class CommitUtil {
     }
     return totalNew;
   }
+
+  public static String getTimeDaysAgo(int numberOfDays) {
+    Date date = 
Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
+    return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
+  }
+
+  /**
+   * Add hours to specified time. If hours <0, this acts as remove hours.
+   * example, say compactionCommitTime: "20200202020000"
+   *  a) hours: +1, returns 20200202030000
+   *  b) hours: -1, returns 20200202010000
+   */
+  public static String addHours(String compactionCommitTime, int hours) throws 
ParseException {
+    Instant instant = 
HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).toInstant();
+    ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, 
ZoneId.systemDefault());
+    return 
HoodieActiveTimeline.COMMIT_FORMATTER.format(Date.from(commitDateTime.plusHours(hours).toInstant()));
+  }
 }

Reply via email to