This is an automated email from the ASF dual-hosted git repository. nagarwal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 462fd02 [HUDI-571] Add 'commits show archived' command to CLI 462fd02 is described below commit 462fd025563b0ae8a4d4f28d366a9bbfca070d3f Author: Satish Kotha <satishko...@uber.com> AuthorDate: Wed Jan 22 13:50:34 2020 -0800 [HUDI-571] Add 'commits show archived' command to CLI --- .../apache/hudi/cli/commands/CommitsCommand.java | 105 +++++++++-- .../apache/hudi/io/TestHoodieCommitArchiveLog.java | 73 ++++---- .../apache/hudi/common/model/HoodieWriteStat.java | 3 +- .../apache/hudi/common/table/HoodieTimeline.java | 8 + .../table/timeline/HoodieActiveTimeline.java | 89 ---------- .../table/timeline/HoodieArchivedTimeline.java | 192 ++++++++++++++++++--- .../table/timeline/HoodieDefaultTimeline.java | 81 ++++++++- .../common/table/TestHoodieTableMetaClient.java | 35 ---- 8 files changed, 385 insertions(+), 201 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java index c0f8ead..3a11e58 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java @@ -28,9 +28,12 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.NumericUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.spark.launcher.SparkLauncher; import org.springframework.shell.core.CommandMarker; import org.springframework.shell.core.annotation.CliCommand; @@ -38,7 +41,10 @@ import org.springframework.shell.core.annotation.CliOption; import org.springframework.stereotype.Component; import java.io.IOException; +import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -51,6 +57,49 @@ import java.util.stream.Collectors; @Component public class CommitsCommand implements CommandMarker { + private String printCommits(HoodieDefaultTimeline timeline, + final Integer limit, final String sortByField, + final boolean descending, + final boolean headerOnly) throws IOException { + final List<Comparable[]> rows = new ArrayList<>(); + + final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants() + .getInstants().collect(Collectors.toList()); + // timeline can be read from multiple files. So sort is needed instead of reversing the collection + Collections.sort(commits, HoodieInstant.COMPARATOR.reversed()); + + for (int i = 0; i < commits.size(); i++) { + final HoodieInstant commit = commits.get(i); + final HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(commit).get(), + HoodieCommitMetadata.class); + rows.add(new Comparable[]{commit.getTimestamp(), + commitMetadata.fetchTotalBytesWritten(), + commitMetadata.fetchTotalFilesInsert(), + commitMetadata.fetchTotalFilesUpdated(), + commitMetadata.fetchTotalPartitionsWritten(), + commitMetadata.fetchTotalRecordsWritten(), + commitMetadata.fetchTotalUpdateRecordsWritten(), + commitMetadata.fetchTotalWriteErrors()}); + } + + final Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); + fieldNameToConverterMap.put("Total Bytes Written", entry -> { + return NumericUtils.humanReadableByteCount((Double.valueOf(entry.toString()))); + }); + + final TableHeader header = new TableHeader() + .addTableHeaderField("CommitTime") + .addTableHeaderField("Total Bytes Written") + .addTableHeaderField("Total Files Added") + .addTableHeaderField("Total Files Updated") + .addTableHeaderField("Total Partitions Written") + .addTableHeaderField("Total Records Written") + .addTableHeaderField("Total Update Records Written") + .addTableHeaderField("Total Errors"); + return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); + } + @CliCommand(value = "commits show", help = "Show the commits") public String showCommits( @CliOption(key = {"limit"}, help = "Limit commits", @@ -62,26 +111,39 @@ public class CommitsCommand implements CommandMarker { throws IOException { HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitsTimeline().filterCompletedInstants(); - List<HoodieInstant> commits = timeline.getReverseOrderedInstants().collect(Collectors.toList()); - List<Comparable[]> rows = new ArrayList<>(); - for (HoodieInstant commit : commits) { - HoodieCommitMetadata commitMetadata = - HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class); - rows.add(new Comparable[] {commit.getTimestamp(), commitMetadata.fetchTotalBytesWritten(), - commitMetadata.fetchTotalFilesInsert(), commitMetadata.fetchTotalFilesUpdated(), - commitMetadata.fetchTotalPartitionsWritten(), commitMetadata.fetchTotalRecordsWritten(), - commitMetadata.fetchTotalUpdateRecordsWritten(), commitMetadata.fetchTotalWriteErrors()}); - } - - Map<String, Function<Object, String>> fieldNameToConverterMap = new HashMap<>(); - fieldNameToConverterMap.put("Total Bytes Written", entry -> NumericUtils.humanReadableByteCount((Double.parseDouble(entry.toString())))); + return printCommits(activeTimeline, limit, sortByField, descending, headerOnly); + } - TableHeader header = new TableHeader().addTableHeaderField("CommitTime").addTableHeaderField("Total Bytes Written") - .addTableHeaderField("Total Files Added").addTableHeaderField("Total Files Updated") - .addTableHeaderField("Total Partitions Written").addTableHeaderField("Total Records Written") - .addTableHeaderField("Total Update Records Written").addTableHeaderField("Total Errors"); - return HoodiePrintHelper.print(header, fieldNameToConverterMap, sortByField, descending, limit, headerOnly, rows); + @CliCommand(value = "commits show archived", help = "Show the archived commits") + public String showArchivedCommits( + @CliOption(key = {"startTs"}, mandatory = false, help = "start time for commits, default: now - 10 days") + String startTs, + @CliOption(key = {"endTs"}, mandatory = false, help = "end time for commits, default: now - 1 day") + String endTs, + @CliOption(key = {"limit"}, mandatory = false, help = "Limit commits", unspecifiedDefaultValue = "-1") + final Integer limit, + @CliOption(key = {"sortBy"}, help = "Sorting Field", unspecifiedDefaultValue = "") + final String sortByField, + @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = "false") + final boolean descending, + @CliOption(key = {"headeronly"}, help = "Print Header Only", unspecifiedDefaultValue = "false") + final boolean headerOnly) + throws IOException { + if (StringUtils.isNullOrEmpty(startTs)) { + startTs = getTimeDaysAgo(10); + } + if (StringUtils.isNullOrEmpty(endTs)) { + endTs = getTimeDaysAgo(1); + } + HoodieArchivedTimeline archivedTimeline = HoodieCLI.getTableMetaClient().getArchivedTimeline(); + try { + archivedTimeline.loadInstantDetailsInMemory(startTs, endTs); + return printCommits(archivedTimeline.findInstantsInRange(startTs, endTs), + limit, sortByField, descending, headerOnly); + } finally { + // clear the instant details from memory after printing to reduce usage + archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs); + } } @CliCommand(value = "commits refresh", help = "Refresh the commits") @@ -241,4 +303,9 @@ public class CommitsCommand implements CommandMarker { + HoodieCLI.syncTableMetadata.getTableConfig().getTableName(); } + private String getTimeDaysAgo(int numberOfDays) { + Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant()); + return HoodieActiveTimeline.COMMIT_FORMATTER.format(date); + } + } diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java index c0fb1ad..eccbc7a 100644 --- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java +++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java @@ -19,24 +19,18 @@ package org.apache.hudi.io; import org.apache.hudi.HoodieClientTestHarness; -import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.common.HoodieTestDataGenerator; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; -import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.HoodieLogFormat.Reader; -import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import com.google.common.collect.Sets; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.junit.After; @@ -44,7 +38,8 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -197,35 +192,18 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { instants.contains(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "105"))); // read the file - Reader reader = - HoodieLogFormat.newReader(dfs, new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1_1-0-1")), - HoodieArchivedMetaEntry.getClassSchema()); - int archivedRecordsCount = 0; - List<IndexedRecord> readRecords = new ArrayList<>(); - // read the avro blocks and validate the number of records written in each avro block - int numBlocks = 0; - while (reader.hasNext()) { - HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); - List<IndexedRecord> records = blk.getRecords(); - readRecords.addAll(records); - archivedRecordsCount += records.size(); - numBlocks++; - } - System.out.println("Read Records :" + readRecords.stream().map(r -> (GenericRecord) r) - .map(r -> r.get("actionType") + "_" + r.get("actionState") + "_" + r.get("commitTime")).collect(Collectors.toList())); - assertEquals("Total archived records and total read records are the same count", 24, archivedRecordsCount); - assertTrue("Average Archived records per block is greater than 1", archivedRecordsCount / numBlocks > 1); - // make sure the archived commits are the same as the (originalcommits - commitsleft) - Set<String> readCommits = readRecords.stream().map(r -> (GenericRecord) r).map(r -> { - return r.get("commitTime").toString(); - }).collect(Collectors.toSet()); + HoodieArchivedTimeline archivedTimeline = new HoodieArchivedTimeline(metaClient); + assertEquals("Total archived records and total read records are the same count", + 24, archivedTimeline.countInstants()); + //make sure the archived commits are the same as the (originalcommits - commitsleft) + Set<String> readCommits = + archivedTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); assertEquals("Read commits map should match the originalCommits - commitsLoadedFromArchival", - originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), readCommits); + originalCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), readCommits); // verify in-flight instants after archive verifyInflightInstants(metaClient, 2); - reader.close(); } @Test @@ -397,6 +375,37 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness { timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "107"))); } + @Test + public void checkArchiveCommitTimeline() throws IOException, InterruptedException { + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + .withParallelism(2, 2).forTable("test-trip-table") + .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build()) + .build(); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient); + + HoodieTestDataGenerator.createCommitFile(basePath, "1", dfs.getConf()); + HoodieInstant instant1 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1"); + HoodieTestDataGenerator.createCommitFile(basePath, "2", dfs.getConf()); + HoodieInstant instant2 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "2"); + HoodieTestDataGenerator.createCommitFile(basePath, "3", dfs.getConf()); + HoodieInstant instant3 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3"); + + //add 2 more instants to pass filter criteria set in compaction config above + HoodieTestDataGenerator.createCommitFile(basePath, "4", dfs.getConf()); + HoodieInstant instant4 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "4"); + HoodieTestDataGenerator.createCommitFile(basePath, "5", dfs.getConf()); + HoodieInstant instant5 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "5"); + + boolean result = archiveLog.archiveIfRequired(jsc); + assertTrue(result); + + HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); + List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3); + assertEquals(new HashSet(archivedInstants), archivedTimeline.getInstants().collect(Collectors.toSet())); + } + private void verifyInflightInstants(HoodieTableMetaClient metaClient, int expectedTotalInstants) { HoodieTimeline timeline = metaClient.getActiveTimeline().reload() .getTimelineOfActions(Sets.newHashSet(HoodieTimeline.CLEAN_ACTION)).filterInflights(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java index 0135dbe..97288df 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java @@ -135,6 +135,7 @@ public class HoodieWriteStat implements Serializable { /** * Total number of rollback blocks seen in a compaction operation. */ + @Nullable private long totalRollbackBlocks; /** @@ -290,7 +291,7 @@ public class HoodieWriteStat implements Serializable { return totalRollbackBlocks; } - public void setTotalRollbackBlocks(Long totalRollbackBlocks) { + public void setTotalRollbackBlocks(long totalRollbackBlocks) { this.totalRollbackBlocks = totalRollbackBlocks; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java index 015a497..575a9ea 100755 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTimeline.java @@ -234,6 +234,14 @@ public interface HoodieTimeline extends Serializable { return predicateToApply.test(commit1, commit2); } + /** + * Return true if specified timestamp is in range (startTs, endTs]. + */ + static boolean isInRange(String timestamp, String startTs, String endTs) { + return HoodieTimeline.compareTimestamps(timestamp, startTs, GREATER) + && HoodieTimeline.compareTimestamps(timestamp, endTs, LESSER_OR_EQUAL); + } + static HoodieInstant getCompletedInstant(final HoodieInstant instant) { return new HoodieInstant(State.COMPLETED, instant.getAction(), instant.getTimestamp()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index f322d47..e5829f8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -27,7 +27,6 @@ import org.apache.hudi.exception.HoodieIOException; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; @@ -45,7 +44,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * Represents the Active Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the @@ -134,93 +132,6 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { in.defaultReadObject(); } - /** - * Get all instants (commits, delta commits) that produce new data, in the active timeline. - */ - public HoodieTimeline getCommitsTimeline() { - return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION)); - } - - /** - * Get all instants (commits, delta commits, in-flight/request compaction) that produce new data, in the active - * timeline * With Async compaction a requested/inflight compaction-instant is a valid baseInstant for a file-slice as - * there could be delta-commits with that baseInstant. - */ - @Override - public HoodieTimeline getCommitsAndCompactionTimeline() { - return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, COMPACTION_ACTION)); - } - - /** - * Get all instants (commits, delta commits, clean, savepoint, rollback) that result in actions, in the active - * timeline. - */ - public HoodieTimeline getAllCommitsTimeline() { - return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, COMPACTION_ACTION, - SAVEPOINT_ACTION, ROLLBACK_ACTION)); - } - - /** - * Get only pure commits (inflight and completed) in the active timeline. - */ - public HoodieTimeline getCommitTimeline() { - return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION)); - } - - /** - * Get only the delta commits (inflight and completed) in the active timeline. - */ - public HoodieTimeline getDeltaCommitTimeline() { - return new HoodieDefaultTimeline(filterInstantsByAction(DELTA_COMMIT_ACTION), - (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails); - } - - /** - * Get a timeline of a specific set of actions. useful to create a merged timeline of multiple actions. - * - * @param actions actions allowed in the timeline - */ - public HoodieTimeline getTimelineOfActions(Set<String> actions) { - return new HoodieDefaultTimeline(getInstants().filter(s -> actions.contains(s.getAction())), - (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails); - } - - /** - * Get only the cleaner action (inflight and completed) in the active timeline. - */ - public HoodieTimeline getCleanerTimeline() { - return new HoodieDefaultTimeline(filterInstantsByAction(CLEAN_ACTION), - (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails); - } - - /** - * Get only the rollback action (inflight and completed) in the active timeline. - */ - public HoodieTimeline getRollbackTimeline() { - return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION), - (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails); - } - - /** - * Get only the save point action (inflight and completed) in the active timeline. - */ - public HoodieTimeline getSavePointTimeline() { - return new HoodieDefaultTimeline(filterInstantsByAction(SAVEPOINT_ACTION), - (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails); - } - - /** - * Get only the restore action (inflight and completed) in the active timeline. - */ - public HoodieTimeline getRestoreTimeline() { - return new HoodieDefaultTimeline(filterInstantsByAction(RESTORE_ACTION), - (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails); - } - - protected Stream<HoodieInstant> filterInstantsByAction(String action) { - return getInstants().filter(s -> s.getAction().equals(action)); - } - public void createNewInstant(HoodieInstant instant) { LOG.info("Creating a new instant " + instant); // Create the in-flight file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 4e45925..a2ad80c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -18,24 +18,36 @@ package org.apache.hudi.common.table.timeline; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTimeline; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the @@ -49,34 +61,27 @@ import java.util.stream.Collectors; * This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. */ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { + private static final Pattern ARCHIVE_FILE_PATTERN = + Pattern.compile("^\\.commits_\\.archive\\.([0-9]*)$"); - private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE = "commits"; + private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX = "commits"; + private static final String ACTION_TYPE_KEY = "actionType"; private HoodieTableMetaClient metaClient; private Map<String, byte[]> readCommits = new HashMap<>(); private static final Logger LOG = LogManager.getLogger(HoodieArchivedTimeline.class); + /** + * Loads instants between (startTs, endTs]. + * Note that there is no lazy loading, so this may not work if really long time range (endTs-startTs) is specified. + * TBD: Should we enforce maximum time range? + */ public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) { - // Read back the commits to make sure - Path archiveLogPath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath()); - try (SequenceFile.Reader reader = - new SequenceFile.Reader(metaClient.getHadoopConf(), SequenceFile.Reader.file(archiveLogPath))) { - Text key = new Text(); - Text val = new Text(); - while (reader.next(key, val)) { - // TODO - limit the number of commits loaded in memory. this could get very large. - // This is okay because only tooling will load the archived commit timeline today - readCommits.put(key.toString(), Arrays.copyOf(val.getBytes(), val.getLength())); - } - this.setInstants(readCommits.keySet().stream().map(s -> new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, s)) - .collect(Collectors.toList())); - } catch (IOException e) { - throw new HoodieIOException("Could not load archived commit timeline from path " + archiveLogPath, e); - } + this.metaClient = metaClient; + setInstants(this.loadInstants(false)); // multiple casts will make this lambda serializable - // http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16 this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails; - this.metaClient = metaClient; } /** @@ -96,7 +101,16 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { } public static Path getArchiveLogPath(String archiveFolder) { - return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE); + return new Path(archiveFolder, HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX); + } + + public void loadInstantDetailsInMemory(String startTs, String endTs) { + loadInstants(startTs, endTs); + } + + public void clearInstantDetailsFromMemory(String startTs, String endTs) { + this.findInstantsInRange(startTs, endTs).getInstants().forEach(instant -> + this.readCommits.remove(instant.getTimestamp())); } @Override @@ -108,4 +122,136 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { return new HoodieArchivedTimeline(metaClient); } + private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) { + final String commitTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString(); + final String action = record.get(ACTION_TYPE_KEY).toString(); + if (loadDetails) { + Option.ofNullable(record.get(getMetadataKey(action))).map(actionData -> + this.readCommits.put(commitTime, actionData.toString().getBytes(StandardCharsets.UTF_8)) + ); + } + return new HoodieInstant(false, action, commitTime); + } + + private String getMetadataKey(String action) { + switch (action) { + case HoodieTimeline.CLEAN_ACTION: + return "hoodieCleanMetadata"; + case HoodieTimeline.COMMIT_ACTION: + return "hoodieCommitMetadata"; + case HoodieTimeline.DELTA_COMMIT_ACTION: + return "hoodieCommitMetadata"; + case HoodieTimeline.ROLLBACK_ACTION: + return "hoodieRollbackMetadata"; + case HoodieTimeline.SAVEPOINT_ACTION: + return "hoodieSavePointMetadata"; + default: + throw new HoodieIOException("Unknown action in metadata " + action); + } + } + + private List<HoodieInstant> loadInstants(boolean loadInstantDetails) { + return loadInstants(null, loadInstantDetails); + } + + private List<HoodieInstant> loadInstants(String startTs, String endTs) { + return loadInstants(new TimeRangeFilter(startTs, endTs), true); + } + + /** + * This is method to read selected instants. Do NOT use this directly use one of the helper methods above + * If loadInstantDetails is set to true, this would also update 'readCommits' map with commit details + * If filter is specified, only the filtered instants are loaded + */ + private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails) { + try { + // list all files + FileStatus[] fsStatuses = metaClient.getFs().globStatus( + new Path(metaClient.getArchivePath() + "/.commits_.archive*")); + + // sort files by version suffix in reverse (implies reverse chronological order) + Arrays.sort(fsStatuses, new ArchiveFileVersionComparator()); + + List<HoodieInstant> instantsInRange = new ArrayList<>(); + for (FileStatus fs : fsStatuses) { + //read the archived file + HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(), + new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema()); + try { + int instantsInPreviousFile = instantsInRange.size(); + //read the avro blocks + while (reader.hasNext()) { + HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); + // TODO If we can store additional metadata in datablock, we can skip parsing records + // (such as startTime, endTime of records in the block) + List<IndexedRecord> records = blk.getRecords(); + // filter blocks in desired time window + Stream<HoodieInstant> instantsInBlkStream = records.stream() + .map(r -> readCommit((GenericRecord) r, loadInstantDetails)); + + if (filter != null) { + instantsInBlkStream = instantsInBlkStream.filter(filter::isInRange); + } + + instantsInRange.addAll(instantsInBlkStream.collect(Collectors.toList())); + } + + if (filter != null) { + int instantsInCurrentFile = instantsInRange.size() - instantsInPreviousFile; + if (instantsInPreviousFile > 0 && instantsInCurrentFile == 0) { + // Note that this is an optimization to skip reading unnecessary archived files + // This signals we crossed lower bound of desired time window. + break; + } + } + } finally { + reader.close(); + } + } + + return instantsInRange; + } catch (IOException e) { + throw new HoodieIOException( + "Could not load archived commit timeline from path " + metaClient.getArchivePath(), e); + } + } + + private static class TimeRangeFilter { + private final String startTs; + private final String endTs; + + public TimeRangeFilter(String startTs, String endTs) { + this.startTs = startTs; + this.endTs = endTs; + } + + public boolean isInRange(HoodieInstant instant) { + return HoodieTimeline.isInRange(instant.getTimestamp(), this.startTs, this.endTs); + } + } + + /** + * Sort files by reverse order of version suffix in file name. + */ + public static class ArchiveFileVersionComparator implements Comparator<FileStatus>, Serializable { + @Override + public int compare(FileStatus f1, FileStatus f2) { + return Integer.compare(getArchivedFileSuffix(f2), getArchivedFileSuffix(f1)); + } + + private int getArchivedFileSuffix(FileStatus f) { + try { + Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(f.getPath().getName()); + if (fileMatcher.matches()) { + return Integer.parseInt(fileMatcher.group(1)); + } + } catch (NumberFormatException e) { + // log and ignore any format warnings + LOG.warn("error getting suffix for archived file: " + f.getPath()); + } + + // return default value in case of any errors + return 0; + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 78d6c6f..9f06629 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -28,6 +28,7 @@ import com.google.common.collect.Sets; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.io.Serializable; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.List; @@ -126,8 +127,7 @@ public class HoodieDefaultTimeline implements HoodieTimeline { @Override public HoodieDefaultTimeline findInstantsInRange(String startTs, String endTs) { return new HoodieDefaultTimeline( - instants.stream().filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), startTs, GREATER) - && HoodieTimeline.compareTimestamps(s.getTimestamp(), endTs, LESSER_OR_EQUAL)), + instants.stream().filter(s -> HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details); } @@ -143,6 +143,83 @@ public class HoodieDefaultTimeline implements HoodieTimeline { return new HoodieDefaultTimeline(instants.stream().filter(filter), details); } + /** + * Get all instants (commits, delta commits) that produce new data, in the active timeline. + */ + public HoodieTimeline getCommitsTimeline() { + return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION)); + } + + /** + * Get all instants (commits, delta commits, clean, savepoint, rollback) that result in actions, in the active + * timeline. + */ + public HoodieTimeline getAllCommitsTimeline() { + return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION, COMPACTION_ACTION, + SAVEPOINT_ACTION, ROLLBACK_ACTION)); + } + + /** + * Get only pure commits (inflight and completed) in the active timeline. + */ + public HoodieTimeline getCommitTimeline() { + return getTimelineOfActions(Sets.newHashSet(COMMIT_ACTION)); + } + + /** + * Get only the delta commits (inflight and completed) in the active timeline. + */ + public HoodieTimeline getDeltaCommitTimeline() { + return new HoodieDefaultTimeline(filterInstantsByAction(DELTA_COMMIT_ACTION), + (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails); + } + + /** + * Get a timeline of a specific set of actions. useful to create a merged timeline of multiple actions. + * + * @param actions actions allowed in the timeline + */ + public HoodieTimeline getTimelineOfActions(Set<String> actions) { + return new HoodieDefaultTimeline(getInstants().filter(s -> actions.contains(s.getAction())), + (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails); + } + + /** + * Get only the cleaner action (inflight and completed) in the active timeline. + */ + public HoodieTimeline getCleanerTimeline() { + return new HoodieDefaultTimeline(filterInstantsByAction(CLEAN_ACTION), + (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails); + } + + /** + * Get only the rollback action (inflight and completed) in the active timeline. + */ + public HoodieTimeline getRollbackTimeline() { + return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION), + (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails); + } + + /** + * Get only the save point action (inflight and completed) in the active timeline. + */ + public HoodieTimeline getSavePointTimeline() { + return new HoodieDefaultTimeline(filterInstantsByAction(SAVEPOINT_ACTION), + (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails); + } + + /** + * Get only the restore action (inflight and completed) in the active timeline. + */ + public HoodieTimeline getRestoreTimeline() { + return new HoodieDefaultTimeline(filterInstantsByAction(RESTORE_ACTION), + (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails); + } + + protected Stream<HoodieInstant> filterInstantsByAction(String action) { + return getInstants().filter(s -> s.getAction().equals(action)); + } + @Override public boolean empty() { return !instants.stream().findFirst().isPresent(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java index 6864623..8b9f643 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java @@ -21,20 +21,13 @@ package org.apache.hudi.common.table; import org.apache.hudi.common.HoodieCommonTestHarness; import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.util.Arrays; -import java.util.stream.Collectors; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -100,32 +93,4 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness { assertArrayEquals("Commit value should be \"test-detail\"", "test-detail".getBytes(), activeCommitTimeline.getInstantDetails(completedInstant).get()); } - - @Test - public void checkArchiveCommitTimeline() throws IOException { - Path archiveLogPath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath()); - SequenceFile.Writer writer = - SequenceFile.createWriter(metaClient.getHadoopConf(), SequenceFile.Writer.file(archiveLogPath), - SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)); - - writer.append(new Text("1"), new Text("data1")); - writer.append(new Text("2"), new Text("data2")); - writer.append(new Text("3"), new Text("data3")); - - IOUtils.closeStream(writer); - - HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); - - HoodieInstant instant1 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "1"); - HoodieInstant instant2 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "2"); - HoodieInstant instant3 = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "3"); - - assertEquals(Arrays.asList(instant1, instant2, instant3), - archivedTimeline.getInstants().collect(Collectors.toList())); - - assertArrayEquals(new Text("data1").getBytes(), archivedTimeline.getInstantDetails(instant1).get()); - assertArrayEquals(new Text("data2").getBytes(), archivedTimeline.getInstantDetails(instant2).get()); - assertArrayEquals(new Text("data3").getBytes(), archivedTimeline.getInstantDetails(instant3).get()); - } - }