This is an automated email from the ASF dual-hosted git repository.

satish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a34318  [HUDI-1746] Added support for replace commits in commit 
showpartitions, commit show_write_stats, commit showfiles (#2678)
4a34318 is described below

commit 4a3431866d995fd877a067967457591a0bc8847d
Author: jsbali <jagmeet.b...@gmail.com>
AuthorDate: Wed Apr 21 23:01:35 2021 +0530

    [HUDI-1746] Added support for replace commits in commit showpartitions, 
commit show_write_stats, commit showfiles (#2678)
    
    * Added support for replace commits in commit showpartitions, commit 
show_write_stats, commit showfiles
    
    * Adding CR changes
    
    * [HUDI-1746] Code review changes
---
 .../apache/hudi/cli/commands/CommitsCommand.java   |  79 ++++++++++----
 .../hudi/cli/commands/TestCommitsCommand.java      | 120 ++++++++++++++++++++-
 .../HoodieTestCommitMetadataGenerator.java         |  21 ++--
 .../HoodieTestReplaceCommitMetadatGenerator.java   |  92 ++++++++++++++++
 4 files changed, 282 insertions(+), 30 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index 3e216b4..9517234 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -26,6 +26,7 @@ import org.apache.hudi.cli.utils.CommitUtil;
 import org.apache.hudi.cli.utils.InputStreamConsumer;
 import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -34,6 +35,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.NumericUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 
 import org.apache.spark.launcher.SparkLauncher;
@@ -44,6 +46,7 @@ import org.springframework.stereotype.Component;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -266,15 +269,18 @@ public class CommitsCommand implements CommandMarker {
 
     HoodieActiveTimeline activeTimeline = 
HoodieCLI.getTableMetaClient().getActiveTimeline();
     HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
-    HoodieInstant commitInstant = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, instantTime);
 
-    if (!timeline.containsInstant(commitInstant)) {
+    Option<HoodieInstant> hoodieInstantOption = getCommitForInstant(timeline, 
instantTime);
+    Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(timeline, hoodieInstantOption);
+
+    if (!commitMetadataOptional.isPresent()) {
       return "Commit " + instantTime + " not found in Commits " + timeline;
     }
-    HoodieCommitMetadata meta = 
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(),
-        HoodieCommitMetadata.class);
+
+    HoodieCommitMetadata meta = commitMetadataOptional.get();
     List<Comparable[]> rows = new ArrayList<>();
     for (Map.Entry<String, List<HoodieWriteStat>> entry : 
meta.getPartitionToWriteStats().entrySet()) {
+      String action = hoodieInstantOption.get().getAction();
       String path = entry.getKey();
       List<HoodieWriteStat> stats = entry.getValue();
       long totalFilesAdded = 0;
@@ -294,7 +300,7 @@ public class CommitsCommand implements CommandMarker {
         totalBytesWritten += stat.getTotalWriteBytes();
         totalWriteErrors += stat.getTotalWriteErrors();
       }
-      rows.add(new Comparable[] {path, totalFilesAdded, totalFilesUpdated, 
totalRecordsInserted, totalRecordsUpdated,
+      rows.add(new Comparable[] {action, path, totalFilesAdded, 
totalFilesUpdated, totalRecordsInserted, totalRecordsUpdated,
           totalBytesWritten, totalWriteErrors});
     }
 
@@ -302,7 +308,8 @@ public class CommitsCommand implements CommandMarker {
     
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, 
entry ->
         
NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
 
-    TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+    TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
         .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
         
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
         
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED)
@@ -328,24 +335,29 @@ public class CommitsCommand implements CommandMarker {
 
     HoodieActiveTimeline activeTimeline = 
HoodieCLI.getTableMetaClient().getActiveTimeline();
     HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
-    HoodieInstant commitInstant = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, instantTime);
 
-    if (!timeline.containsInstant(commitInstant)) {
+    Option<HoodieInstant> hoodieInstantOption = getCommitForInstant(timeline, 
instantTime);
+    Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(timeline, hoodieInstantOption);
+
+    if (!commitMetadataOptional.isPresent()) {
       return "Commit " + instantTime + " not found in Commits " + timeline;
     }
-    HoodieCommitMetadata meta = 
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(),
-        HoodieCommitMetadata.class);
+
+    HoodieCommitMetadata meta = commitMetadataOptional.get();
+
+    String action = hoodieInstantOption.get().getAction();
     long recordsWritten = meta.fetchTotalRecordsWritten();
     long bytesWritten = meta.fetchTotalBytesWritten();
     long avgRecSize = (long) Math.ceil((1.0 * bytesWritten) / recordsWritten);
     List<Comparable[]> rows = new ArrayList<>();
-    rows.add(new Comparable[] {bytesWritten, recordsWritten, avgRecSize});
+    rows.add(new Comparable[] {action, bytesWritten, recordsWritten, 
avgRecSize});
 
     Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
     
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, 
entry ->
         
NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
 
-    TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN_COMMIT)
+    TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+        
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN_COMMIT)
         
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN_COMMIT)
         
.addTableHeaderField(HoodieTableHeaderFields.HEADER_AVG_REC_SIZE_COMMIT);
 
@@ -367,24 +379,28 @@ public class CommitsCommand implements CommandMarker {
 
     HoodieActiveTimeline activeTimeline = 
HoodieCLI.getTableMetaClient().getActiveTimeline();
     HoodieTimeline timeline = 
activeTimeline.getCommitsTimeline().filterCompletedInstants();
-    HoodieInstant commitInstant = new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, instantTime);
 
-    if (!timeline.containsInstant(commitInstant)) {
+    Option<HoodieInstant> hoodieInstantOption = getCommitForInstant(timeline, 
instantTime);
+    Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(timeline, hoodieInstantOption);
+
+    if (!commitMetadataOptional.isPresent()) {
       return "Commit " + instantTime + " not found in Commits " + timeline;
     }
-    HoodieCommitMetadata meta = 
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(),
-        HoodieCommitMetadata.class);
+
+    HoodieCommitMetadata meta = commitMetadataOptional.get();
     List<Comparable[]> rows = new ArrayList<>();
     for (Map.Entry<String, List<HoodieWriteStat>> entry : 
meta.getPartitionToWriteStats().entrySet()) {
+      String action = hoodieInstantOption.get().getAction();
       String path = entry.getKey();
       List<HoodieWriteStat> stats = entry.getValue();
       for (HoodieWriteStat stat : stats) {
-        rows.add(new Comparable[] {path, stat.getFileId(), 
stat.getPrevCommit(), stat.getNumUpdateWrites(),
+        rows.add(new Comparable[] {action, path, stat.getFileId(), 
stat.getPrevCommit(), stat.getNumUpdateWrites(),
             stat.getNumWrites(), stat.getTotalWriteBytes(), 
stat.getTotalWriteErrors(), stat.getFileSizeInBytes()});
       }
     }
 
-    TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+    TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
         .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
         .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT)
         
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
@@ -431,4 +447,31 @@ public class CommitsCommand implements CommandMarker {
     return "Load sync state between " + 
HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
         + HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
   }
+
+  /*
+  Checks whether a commit or replacecommit action exists in the timeline.
+  * */
+  private Option<HoodieInstant> getCommitForInstant(HoodieTimeline timeline, 
String instantTime) throws IOException {
+    List<HoodieInstant> instants = Arrays.asList(
+            new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, 
instantTime),
+            new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, 
instantTime),
+            new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, 
instantTime));
+
+    Option<HoodieInstant> hoodieInstant = 
Option.fromJavaOptional(instants.stream().filter(timeline::containsInstant).findAny());
+
+    return hoodieInstant;
+  }
+
+  private Option<HoodieCommitMetadata> getHoodieCommitMetadata(HoodieTimeline 
timeline, Option<HoodieInstant> hoodieInstant) throws IOException {
+    if (hoodieInstant.isPresent()) {
+      if 
(hoodieInstant.get().getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
+        return 
Option.of(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get()).get(),
+            HoodieReplaceCommitMetadata.class));
+      }
+      return 
Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get()).get(),
+              HoodieCommitMetadata.class));
+    }
+
+    return Option.empty();
+  }
 }
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
index 5ad4c4c..e88f129 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
@@ -24,10 +24,12 @@ import org.apache.hudi.cli.HoodieTableHeaderFields;
 import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
 import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.cli.testutils.HoodieTestReplaceCommitMetadatGenerator;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
@@ -75,6 +77,7 @@ public class TestCommitsCommand extends 
AbstractShellIntegrationTest {
     new TableCommand().createTable(
         tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
         "", TimelineLayoutVersion.VERSION_1, 
"org.apache.hudi.common.model.HoodieAvroPayload");
+    metaClient = HoodieCLI.getTableMetaClient();
   }
 
   private LinkedHashMap<String, Integer[]> generateData() throws Exception {
@@ -97,6 +100,42 @@ public class TestCommitsCommand extends 
AbstractShellIntegrationTest {
     return data;
   }
 
+  /*
+  * generates both replace commit and commit data
+  * */
+  private LinkedHashMap<HoodieInstant, Integer[]> generateMixedData() throws 
Exception {
+    // generate data and metadata
+    LinkedHashMap<HoodieInstant, Integer[]> replaceCommitData = new 
LinkedHashMap<>();
+    replaceCommitData.put(new HoodieInstant(false, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "103"), new Integer[] {15, 10});
+
+    LinkedHashMap<HoodieInstant, Integer[]> commitData = new LinkedHashMap<>();
+    commitData.put(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, 
"102"), new Integer[] {15, 10});
+    commitData.put(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, 
"101"), new Integer[] {20, 10});
+
+    for (Map.Entry<HoodieInstant, Integer[]> entry : commitData.entrySet()) {
+      String key = entry.getKey().getTimestamp();
+      Integer[] value = entry.getValue();
+      
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key, 
jsc.hadoopConfiguration(),
+              Option.of(value[0]), Option.of(value[1]));
+    }
+
+    for (Map.Entry<HoodieInstant, Integer[]> entry : 
replaceCommitData.entrySet()) {
+      String key = entry.getKey().getTimestamp();
+      Integer[] value = entry.getValue();
+      
HoodieTestReplaceCommitMetadatGenerator.createReplaceCommitFileWithMetadata(tablePath,
 key,
+              Option.of(value[0]), Option.of(value[1]), metaClient);
+    }
+
+    metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+    assertEquals(3, 
metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(),
+            "There should be 3 commits");
+
+    LinkedHashMap<HoodieInstant, Integer[]> data = replaceCommitData;
+    data.putAll(commitData);
+
+    return data;
+  }
+
   private String generateExpectData(int records, Map<String, Integer[]> data) 
throws IOException {
     FileSystem fs = FileSystem.get(jsc.hadoopConfiguration());
     List<String> partitionPaths =
@@ -216,14 +255,15 @@ public class TestCommitsCommand extends 
AbstractShellIntegrationTest {
     // prevCommit not null, so add 0, update 1
     Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
         
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition
 ->
-        rows.add(new Comparable[] {partition, 0, 1, 0, value[1], 
HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0})
+        rows.add(new Comparable[] {HoodieTimeline.COMMIT_ACTION, partition, 0, 
1, 0, value[1], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0})
     );
 
     Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
     
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN,
         entry -> 
NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
 
-    TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+    TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
         .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
         
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
         
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED)
@@ -237,6 +277,43 @@ public class TestCommitsCommand extends 
AbstractShellIntegrationTest {
     assertEquals(expected, got);
   }
 
+  @Test
+  public void testShowCommitPartitionsWithReplaceCommits() throws Exception {
+    Map<HoodieInstant, Integer[]> data = generateMixedData();
+
+    for (HoodieInstant commitInstant: data.keySet()) {
+      CommandResult cr = getShell().executeCommand(String.format("commit 
showpartitions --commit %s", commitInstant.getTimestamp()));
+
+      assertTrue(cr.isSuccess());
+
+      Integer[] value = data.get(commitInstant);
+      List<Comparable[]> rows = new ArrayList<>();
+      // prevCommit not null, so add 0, update 1
+      Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+          
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition
 ->
+          rows.add(new Comparable[] {commitInstant.getAction(), partition, 0, 
1, 0, value[1], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0})
+      );
+
+      Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
+      
fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN,
+          entry -> 
NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString()))));
+
+      TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+          
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED)
+          
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED)
+          
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED)
+          
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
+          
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS);
+
+      String expected = HoodiePrintHelper.print(header, 
fieldNameToConverterMap, "", false, -1, false, rows);
+      expected = removeNonWordAndStripSpace(expected);
+      String got = removeNonWordAndStripSpace(cr.getResult().toString());
+      assertEquals(expected, got);
+    }
+  }
+
   /**
    * Test case of 'commit showfiles' command.
    */
@@ -252,12 +329,13 @@ public class TestCommitsCommand extends 
AbstractShellIntegrationTest {
     List<Comparable[]> rows = new ArrayList<>();
     Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
         
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition
 ->
-        rows.add(new Comparable[] {partition, 
HoodieTestCommitMetadataGenerator.DEFAULT_FILEID,
+        rows.add(new Comparable[] {HoodieTimeline.COMMIT_ACTION, partition, 
HoodieTestCommitMetadataGenerator.DEFAULT_FILEID,
             HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT,
             value[1], value[0], 
HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES,
             // default 0 errors and blank file with 0 size
             0, 0}));
-    TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+    TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
         .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
         .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT)
         
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
@@ -272,6 +350,40 @@ public class TestCommitsCommand extends 
AbstractShellIntegrationTest {
     assertEquals(expected, got);
   }
 
+  @Test
+  public void testShowCommitFilesWithReplaceCommits() throws Exception {
+    Map<HoodieInstant, Integer[]> data = generateMixedData();
+
+    for (HoodieInstant commitInstant : data.keySet()) {
+      CommandResult cr = getShell().executeCommand(String.format("commit 
showfiles --commit %s", commitInstant.getTimestamp()));
+      assertTrue(cr.isSuccess());
+
+      Integer[] value = data.get(commitInstant);
+      List<Comparable[]> rows = new ArrayList<>();
+      Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
+          
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition
 ->
+          rows.add(new Comparable[] {commitInstant.getAction(), partition, 
HoodieTestCommitMetadataGenerator.DEFAULT_FILEID,
+              HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT,
+              value[1], value[0], 
HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES,
+              // default 0 errors and blank file with 0 size
+              0, 0}));
+      TableHeader header = new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT)
+          
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED)
+          
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN)
+          
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_SIZE);
+
+      String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", 
false, -1, false, rows);
+      expected = removeNonWordAndStripSpace(expected);
+      String got = removeNonWordAndStripSpace(cr.getResult().toString());
+      assertEquals(expected, got);
+    }
+  }
+
   /**
    * Test case of 'commits compare' command.
    */
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
index f4d8019..c33bb26 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -76,14 +77,17 @@ public class HoodieTestCommitMetadataGenerator extends 
HoodieTestDataGenerator {
     List<String> commitFileNames = 
Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), 
HoodieTimeline.makeInflightCommitFileName(commitTime),
         HoodieTimeline.makeRequestedCommitFileName(commitTime));
     for (String name : commitFileNames) {
-      Path commitFilePath = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/" + name);
-      try (FSDataOutputStream os = FSUtils.getFs(basePath, 
configuration).create(commitFilePath, true)) {
-        // Generate commitMetadata
-        HoodieCommitMetadata commitMetadata =
-            generateCommitMetadata(basePath, commitTime, fileId1, fileId2, 
writes, updates);
-        // Write empty commit metadata
-        os.writeBytes(new 
String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
-      }
+      HoodieCommitMetadata commitMetadata =
+              generateCommitMetadata(basePath, commitTime, fileId1, fileId2, 
writes, updates);
+      String content = commitMetadata.toJsonString();
+      createFileWithMetadata(basePath, configuration, name, content);
+    }
+  }
+
+  static void createFileWithMetadata(String basePath, Configuration 
configuration, String name, String content) throws IOException {
+    Path commitFilePath = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/" + name);
+    try (FSDataOutputStream os = FSUtils.getFs(basePath, 
configuration).create(commitFilePath, true)) {
+      os.writeBytes(new String(content.getBytes(StandardCharsets.UTF_8)));
     }
   }
 
@@ -133,4 +137,5 @@ public class HoodieTestCommitMetadataGenerator extends 
HoodieTestDataGenerator {
     }));
     return metadata;
   }
+
 }
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadatGenerator.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadatGenerator.java
new file mode 100644
index 0000000..f7244f9
--- /dev/null
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadatGenerator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.testutils;
+
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.testutils.FileCreateUtils;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
+import static org.apache.hudi.common.util.CollectionUtils.createImmutableList;
+
+public class HoodieTestReplaceCommitMetadatGenerator extends 
HoodieTestCommitMetadataGenerator {
+  public static void createReplaceCommitFileWithMetadata(String basePath, 
String commitTime, Option<Integer> writes, Option<Integer> updates,
+                                                         HoodieTableMetaClient 
metaclient) throws Exception {
+
+    HoodieReplaceCommitMetadata replaceMetadata = 
generateReplaceCommitMetadata(basePath, commitTime, 
UUID.randomUUID().toString(),
+        UUID.randomUUID().toString(), writes, updates);
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
getHoodieRequestedReplaceMetadata();
+
+    HoodieTestTable.of(metaclient).addReplaceCommit(commitTime, 
requestedReplaceMetadata, replaceMetadata);
+  }
+
+  private static HoodieRequestedReplaceMetadata 
getHoodieRequestedReplaceMetadata() {
+    return HoodieRequestedReplaceMetadata.newBuilder()
+        .setOperationType(WriteOperationType.INSERT_OVERWRITE.toString())
+        .setVersion(1)
+        .setExtraMetadata(Collections.emptyMap())
+        .build();
+  }
+
+  private static HoodieReplaceCommitMetadata 
generateReplaceCommitMetadata(String basePath, String commitTime, String 
fileId1, String fileId2, Option<Integer> writes, Option<Integer> updates)
+      throws Exception {
+    FileCreateUtils.createBaseFile(basePath, DEFAULT_FIRST_PARTITION_PATH, 
commitTime, fileId1);
+    FileCreateUtils.createBaseFile(basePath, DEFAULT_SECOND_PARTITION_PATH, 
commitTime, fileId2);
+    return generateReplaceCommitMetadata(new HashMap<String, List<String>>() {
+      {
+        put(DEFAULT_FIRST_PARTITION_PATH, 
createImmutableList(baseFileName(DEFAULT_FIRST_PARTITION_PATH, fileId1)));
+        put(DEFAULT_SECOND_PARTITION_PATH, 
createImmutableList(baseFileName(DEFAULT_SECOND_PARTITION_PATH, fileId2)));
+      }
+    }, writes, updates);
+  }
+
+  private static HoodieReplaceCommitMetadata 
generateReplaceCommitMetadata(HashMap<String, List<String>> 
partitionToFilePaths, Option<Integer> writes, Option<Integer> updates) {
+    HoodieReplaceCommitMetadata metadata = new HoodieReplaceCommitMetadata();
+    partitionToFilePaths.forEach((key, value) -> value.forEach(f -> {
+      HoodieWriteStat writeStat = new HoodieWriteStat();
+      writeStat.setPartitionPath(key);
+      writeStat.setPath(DEFAULT_PATH);
+      writeStat.setFileId(DEFAULT_FILEID);
+      writeStat.setTotalWriteBytes(DEFAULT_TOTAL_WRITE_BYTES);
+      writeStat.setPrevCommit(DEFAULT_PRE_COMMIT);
+      writeStat.setNumWrites(writes.orElse(DEFAULT_NUM_WRITES));
+      writeStat.setNumUpdateWrites(updates.orElse(DEFAULT_NUM_UPDATE_WRITES));
+      writeStat.setTotalLogBlocks(DEFAULT_TOTAL_LOG_BLOCKS);
+      writeStat.setTotalLogRecords(DEFAULT_TOTAL_LOG_RECORDS);
+      metadata.addWriteStat(key, writeStat);
+    }));
+    metadata.setPartitionToReplaceFileIds(new HashMap<String, List<String>>() {
+      {
+        //TODO fix
+        put(DEFAULT_FIRST_PARTITION_PATH, 
createImmutableList(baseFileName(DEFAULT_FIRST_PARTITION_PATH, "1")));
+      }
+    });
+    return metadata;
+  }
+}

Reply via email to