This is an automated email from the ASF dual-hosted git repository. satish pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 4a34318 [HUDI-1746] Added support for replace commits in commit showpartitions, commit show_write_stats, commit showfiles (#2678) 4a34318 is described below commit 4a3431866d995fd877a067967457591a0bc8847d Author: jsbali <jagmeet.b...@gmail.com> AuthorDate: Wed Apr 21 23:01:35 2021 +0530 [HUDI-1746] Added support for replace commits in commit showpartitions, commit show_write_stats, commit showfiles (#2678) * Added support for replace commits in commit showpartitions, commit show_write_stats, commit showfiles * Adding CR changes * [HUDI-1746] Code review changes --- .../apache/hudi/cli/commands/CommitsCommand.java | 79 ++++++++++---- .../hudi/cli/commands/TestCommitsCommand.java | 120 ++++++++++++++++++++- .../HoodieTestCommitMetadataGenerator.java | 21 ++-- .../HoodieTestReplaceCommitMetadatGenerator.java | 92 ++++++++++++++++ 4 files changed, 282 insertions(+), 30 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java index 3e216b4..9517234 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java @@ -26,6 +26,7 @@ import org.apache.hudi.cli.utils.CommitUtil; import org.apache.hudi.cli.utils.InputStreamConsumer; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -34,6 +35,7 @@ import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.NumericUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.spark.launcher.SparkLauncher; @@ -44,6 +46,7 @@ import org.springframework.stereotype.Component; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -266,15 +269,18 @@ public class CommitsCommand implements CommandMarker { HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); - HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime); - if (!timeline.containsInstant(commitInstant)) { + Option<HoodieInstant> hoodieInstantOption = getCommitForInstant(timeline, instantTime); + Option<HoodieCommitMetadata> commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption); + + if (!commitMetadataOptional.isPresent()) { return "Commit " + instantTime + " not found in Commits " + timeline; } - HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(), - HoodieCommitMetadata.class); + + HoodieCommitMetadata meta = commitMetadataOptional.get(); List<Comparable[]> rows = new ArrayList<>(); for (Map.Entry<String, List<HoodieWriteStat>> entry : meta.getPartitionToWriteStats().entrySet()) { + String action = hoodieInstantOption.get().getAction(); String path = entry.getKey(); List<HoodieWriteStat> stats = entry.getValue(); long totalFilesAdded = 0; @@ -294,7 +300,7 @@ public class CommitsCommand implements CommandMarker { totalBytesWritten += stat.getTotalWriteBytes(); totalWriteErrors += stat.getTotalWriteErrors(); } - rows.add(new Comparable[] {path, totalFilesAdded, totalFilesUpdated, totalRecordsInserted, totalRecordsUpdated, + rows.add(new Comparable[] {action, path, totalFilesAdded, totalFilesUpdated, totalRecordsInserted, totalRecordsUpdated, totalBytesWritten, totalWriteErrors}); } @@ -302,7 +308,8 @@ public class CommitsCommand implements CommandMarker { fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString())))); - TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED) .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED) .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED) @@ -328,24 +335,29 @@ public class CommitsCommand implements CommandMarker { HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); - HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime); - if (!timeline.containsInstant(commitInstant)) { + Option<HoodieInstant> hoodieInstantOption = getCommitForInstant(timeline, instantTime); + Option<HoodieCommitMetadata> commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption); + + if (!commitMetadataOptional.isPresent()) { return "Commit " + instantTime + " not found in Commits " + timeline; } - HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(), - HoodieCommitMetadata.class); + + HoodieCommitMetadata meta = commitMetadataOptional.get(); + + String action = hoodieInstantOption.get().getAction(); long recordsWritten = meta.fetchTotalRecordsWritten(); long bytesWritten = meta.fetchTotalBytesWritten(); long avgRecSize = (long) Math.ceil((1.0 * bytesWritten) / recordsWritten); List<Comparable[]> rows = new ArrayList<>(); - rows.add(new Comparable[] {bytesWritten, recordsWritten, avgRecSize}); + rows.add(new Comparable[] {action, bytesWritten, recordsWritten, avgRecSize}); Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString())))); - TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN_COMMIT) + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN_COMMIT) .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN_COMMIT) .addTableHeaderField(HoodieTableHeaderFields.HEADER_AVG_REC_SIZE_COMMIT); @@ -367,24 +379,28 @@ public class CommitsCommand implements CommandMarker { HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); - HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime); - if (!timeline.containsInstant(commitInstant)) { + Option<HoodieInstant> hoodieInstantOption = getCommitForInstant(timeline, instantTime); + Option<HoodieCommitMetadata> commitMetadataOptional = getHoodieCommitMetadata(timeline, hoodieInstantOption); + + if (!commitMetadataOptional.isPresent()) { return "Commit " + instantTime + " not found in Commits " + timeline; } - HoodieCommitMetadata meta = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(commitInstant).get(), - HoodieCommitMetadata.class); + + HoodieCommitMetadata meta = commitMetadataOptional.get(); List<Comparable[]> rows = new ArrayList<>(); for (Map.Entry<String, List<HoodieWriteStat>> entry : meta.getPartitionToWriteStats().entrySet()) { + String action = hoodieInstantOption.get().getAction(); String path = entry.getKey(); List<HoodieWriteStat> stats = entry.getValue(); for (HoodieWriteStat stat : stats) { - rows.add(new Comparable[] {path, stat.getFileId(), stat.getPrevCommit(), stat.getNumUpdateWrites(), + rows.add(new Comparable[] {action, path, stat.getFileId(), stat.getPrevCommit(), stat.getNumUpdateWrites(), stat.getNumWrites(), stat.getTotalWriteBytes(), stat.getTotalWriteErrors(), stat.getFileSizeInBytes()}); } } - TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID) .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT) .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED) @@ -431,4 +447,31 @@ public class CommitsCommand implements CommandMarker { return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and " + HoodieCLI.syncTableMetadata.getTableConfig().getTableName(); } + + /* + Checks whether a commit or replacecommit action exists in the timeline. + * */ + private Option<HoodieInstant> getCommitForInstant(HoodieTimeline timeline, String instantTime) throws IOException { + List<HoodieInstant> instants = Arrays.asList( + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime), + new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime)); + + Option<HoodieInstant> hoodieInstant = Option.fromJavaOptional(instants.stream().filter(timeline::containsInstant).findAny()); + + return hoodieInstant; + } + + private Option<HoodieCommitMetadata> getHoodieCommitMetadata(HoodieTimeline timeline, Option<HoodieInstant> hoodieInstant) throws IOException { + if (hoodieInstant.isPresent()) { + if (hoodieInstant.get().getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + return Option.of(HoodieReplaceCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get()).get(), + HoodieReplaceCommitMetadata.class)); + } + return Option.of(HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(hoodieInstant.get()).get(), + HoodieCommitMetadata.class)); + } + + return Option.empty(); + } } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java index 5ad4c4c..e88f129 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java @@ -24,10 +24,12 @@ import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; +import org.apache.hudi.cli.testutils.HoodieTestReplaceCommitMetadatGenerator; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestUtils; @@ -75,6 +77,7 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest { new TableCommand().createTable( tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(), "", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload"); + metaClient = HoodieCLI.getTableMetaClient(); } private LinkedHashMap<String, Integer[]> generateData() throws Exception { @@ -97,6 +100,42 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest { return data; } + /* + * generates both replace commit and commit data + * */ + private LinkedHashMap<HoodieInstant, Integer[]> generateMixedData() throws Exception { + // generate data and metadata + LinkedHashMap<HoodieInstant, Integer[]> replaceCommitData = new LinkedHashMap<>(); + replaceCommitData.put(new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, "103"), new Integer[] {15, 10}); + + LinkedHashMap<HoodieInstant, Integer[]> commitData = new LinkedHashMap<>(); + commitData.put(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "102"), new Integer[] {15, 10}); + commitData.put(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "101"), new Integer[] {20, 10}); + + for (Map.Entry<HoodieInstant, Integer[]> entry : commitData.entrySet()) { + String key = entry.getKey().getTimestamp(); + Integer[] value = entry.getValue(); + HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, key, jsc.hadoopConfiguration(), + Option.of(value[0]), Option.of(value[1])); + } + + for (Map.Entry<HoodieInstant, Integer[]> entry : replaceCommitData.entrySet()) { + String key = entry.getKey().getTimestamp(); + Integer[] value = entry.getValue(); + HoodieTestReplaceCommitMetadatGenerator.createReplaceCommitFileWithMetadata(tablePath, key, + Option.of(value[0]), Option.of(value[1]), metaClient); + } + + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + assertEquals(3, metaClient.reloadActiveTimeline().getCommitsTimeline().countInstants(), + "There should be 3 commits"); + + LinkedHashMap<HoodieInstant, Integer[]> data = replaceCommitData; + data.putAll(commitData); + + return data; + } + private String generateExpectData(int records, Map<String, Integer[]> data) throws IOException { FileSystem fs = FileSystem.get(jsc.hadoopConfiguration()); List<String> partitionPaths = @@ -216,14 +255,15 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest { // prevCommit not null, so add 0, update 1 Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition -> - rows.add(new Comparable[] {partition, 0, 1, 0, value[1], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0}) + rows.add(new Comparable[] {HoodieTimeline.COMMIT_ACTION, partition, 0, 1, 0, value[1], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0}) ); Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString())))); - TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED) .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED) .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED) @@ -237,6 +277,43 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest { assertEquals(expected, got); } + @Test + public void testShowCommitPartitionsWithReplaceCommits() throws Exception { + Map<HoodieInstant, Integer[]> data = generateMixedData(); + + for (HoodieInstant commitInstant: data.keySet()) { + CommandResult cr = getShell().executeCommand(String.format("commit showpartitions --commit %s", commitInstant.getTimestamp())); + + assertTrue(cr.isSuccess()); + + Integer[] value = data.get(commitInstant); + List<Comparable[]> rows = new ArrayList<>(); + // prevCommit not null, so add 0, update 1 + Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition -> + rows.add(new Comparable[] {commitInstant.getAction(), partition, 0, 1, 0, value[1], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, 0}) + ); + + Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); + fieldNameToConverterMap.put(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN, + entry -> NumericUtils.humanReadableByteCount((Long.parseLong(entry.toString())))); + + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_ADDED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_UPDATED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_INSERTED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS); + + String expected = HoodiePrintHelper.print(header, fieldNameToConverterMap, "", false, -1, false, rows); + expected = removeNonWordAndStripSpace(expected); + String got = removeNonWordAndStripSpace(cr.getResult().toString()); + assertEquals(expected, got); + } + } + /** * Test case of 'commit showfiles' command. */ @@ -252,12 +329,13 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest { List<Comparable[]> rows = new ArrayList<>(); Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition -> - rows.add(new Comparable[] {partition, HoodieTestCommitMetadataGenerator.DEFAULT_FILEID, + rows.add(new Comparable[] {HoodieTimeline.COMMIT_ACTION, partition, HoodieTestCommitMetadataGenerator.DEFAULT_FILEID, HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT, value[1], value[0], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, // default 0 errors and blank file with 0 size 0, 0})); - TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID) .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT) .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED) @@ -272,6 +350,40 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest { assertEquals(expected, got); } + @Test + public void testShowCommitFilesWithReplaceCommits() throws Exception { + Map<HoodieInstant, Integer[]> data = generateMixedData(); + + for (HoodieInstant commitInstant : data.keySet()) { + CommandResult cr = getShell().executeCommand(String.format("commit showfiles --commit %s", commitInstant.getTimestamp())); + assertTrue(cr.isSuccess()); + + Integer[] value = data.get(commitInstant); + List<Comparable[]> rows = new ArrayList<>(); + Arrays.asList(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, + HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).stream().forEach(partition -> + rows.add(new Comparable[] {commitInstant.getAction(), partition, HoodieTestCommitMetadataGenerator.DEFAULT_FILEID, + HoodieTestCommitMetadataGenerator.DEFAULT_PRE_COMMIT, + value[1], value[0], HoodieTestCommitMetadataGenerator.DEFAULT_TOTAL_WRITE_BYTES, + // default 0 errors and blank file with 0 size + 0, 0})); + TableHeader header = new TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_PREVIOUS_COMMIT) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_UPDATED) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_RECORDS_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_BYTES_WRITTEN) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_ERRORS) + .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_SIZE); + + String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", false, -1, false, rows); + expected = removeNonWordAndStripSpace(expected); + String got = removeNonWordAndStripSpace(cr.getResult().toString()); + assertEquals(expected, got); + } + } + /** * Test case of 'commits compare' command. */ diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java index f4d8019..c33bb26 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestCommitMetadataGenerator.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; @@ -76,14 +77,17 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator { List<String> commitFileNames = Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime), HoodieTimeline.makeRequestedCommitFileName(commitTime)); for (String name : commitFileNames) { - Path commitFilePath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + name); - try (FSDataOutputStream os = FSUtils.getFs(basePath, configuration).create(commitFilePath, true)) { - // Generate commitMetadata - HoodieCommitMetadata commitMetadata = - generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates); - // Write empty commit metadata - os.writeBytes(new String(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - } + HoodieCommitMetadata commitMetadata = + generateCommitMetadata(basePath, commitTime, fileId1, fileId2, writes, updates); + String content = commitMetadata.toJsonString(); + createFileWithMetadata(basePath, configuration, name, content); + } + } + + static void createFileWithMetadata(String basePath, Configuration configuration, String name, String content) throws IOException { + Path commitFilePath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + name); + try (FSDataOutputStream os = FSUtils.getFs(basePath, configuration).create(commitFilePath, true)) { + os.writeBytes(new String(content.getBytes(StandardCharsets.UTF_8))); } } @@ -133,4 +137,5 @@ public class HoodieTestCommitMetadataGenerator extends HoodieTestDataGenerator { })); return metadata; } + } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadatGenerator.java b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadatGenerator.java new file mode 100644 index 0000000..f7244f9 --- /dev/null +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/HoodieTestReplaceCommitMetadatGenerator.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.cli.testutils; + +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.testutils.FileCreateUtils; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.Option; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName; +import static org.apache.hudi.common.util.CollectionUtils.createImmutableList; + +public class HoodieTestReplaceCommitMetadatGenerator extends HoodieTestCommitMetadataGenerator { + public static void createReplaceCommitFileWithMetadata(String basePath, String commitTime, Option<Integer> writes, Option<Integer> updates, + HoodieTableMetaClient metaclient) throws Exception { + + HoodieReplaceCommitMetadata replaceMetadata = generateReplaceCommitMetadata(basePath, commitTime, UUID.randomUUID().toString(), + UUID.randomUUID().toString(), writes, updates); + HoodieRequestedReplaceMetadata requestedReplaceMetadata = getHoodieRequestedReplaceMetadata(); + + HoodieTestTable.of(metaclient).addReplaceCommit(commitTime, requestedReplaceMetadata, replaceMetadata); + } + + private static HoodieRequestedReplaceMetadata getHoodieRequestedReplaceMetadata() { + return HoodieRequestedReplaceMetadata.newBuilder() + .setOperationType(WriteOperationType.INSERT_OVERWRITE.toString()) + .setVersion(1) + .setExtraMetadata(Collections.emptyMap()) + .build(); + } + + private static HoodieReplaceCommitMetadata generateReplaceCommitMetadata(String basePath, String commitTime, String fileId1, String fileId2, Option<Integer> writes, Option<Integer> updates) + throws Exception { + FileCreateUtils.createBaseFile(basePath, DEFAULT_FIRST_PARTITION_PATH, commitTime, fileId1); + FileCreateUtils.createBaseFile(basePath, DEFAULT_SECOND_PARTITION_PATH, commitTime, fileId2); + return generateReplaceCommitMetadata(new HashMap<String, List<String>>() { + { + put(DEFAULT_FIRST_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_FIRST_PARTITION_PATH, fileId1))); + put(DEFAULT_SECOND_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_SECOND_PARTITION_PATH, fileId2))); + } + }, writes, updates); + } + + private static HoodieReplaceCommitMetadata generateReplaceCommitMetadata(HashMap<String, List<String>> partitionToFilePaths, Option<Integer> writes, Option<Integer> updates) { + HoodieReplaceCommitMetadata metadata = new HoodieReplaceCommitMetadata(); + partitionToFilePaths.forEach((key, value) -> value.forEach(f -> { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPartitionPath(key); + writeStat.setPath(DEFAULT_PATH); + writeStat.setFileId(DEFAULT_FILEID); + writeStat.setTotalWriteBytes(DEFAULT_TOTAL_WRITE_BYTES); + writeStat.setPrevCommit(DEFAULT_PRE_COMMIT); + writeStat.setNumWrites(writes.orElse(DEFAULT_NUM_WRITES)); + writeStat.setNumUpdateWrites(updates.orElse(DEFAULT_NUM_UPDATE_WRITES)); + writeStat.setTotalLogBlocks(DEFAULT_TOTAL_LOG_BLOCKS); + writeStat.setTotalLogRecords(DEFAULT_TOTAL_LOG_RECORDS); + metadata.addWriteStat(key, writeStat); + })); + metadata.setPartitionToReplaceFileIds(new HashMap<String, List<String>>() { + { + //TODO fix + put(DEFAULT_FIRST_PARTITION_PATH, createImmutableList(baseFileName(DEFAULT_FIRST_PARTITION_PATH, "1"))); + } + }); + return metadata; + } +}