vinothchandar commented on code in PR #9209: URL: https://github.com/apache/hudi/pull/9209#discussion_r1278642514
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ActiveInstant.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * A combination of instants covering action states: requested, inflight, completed. + */ +public class ActiveInstant implements Serializable, Comparable<ActiveInstant> { + private final HoodieInstant requested; + private final HoodieInstant inflight; + private final HoodieInstant completed; + + /** + * The constructor. + */ + protected ActiveInstant(@Nullable HoodieInstant requested, @Nullable HoodieInstant inflight, HoodieInstant completed) { + this.requested = requested; + this.inflight = inflight; + this.completed = completed; + } + + public static ActiveInstant fromInstants(List<HoodieInstant> instants) { + ValidationUtils.checkArgument(instants.size() <= 3); + HoodieInstant requested = null; + HoodieInstant inflight = null; + HoodieInstant completed = null; + for (HoodieInstant instant : instants) { + if (instant.isRequested()) { + requested = instant; + } else if (instant.isInflight()) { + inflight = instant; + } else { + completed = instant; + } + } + return new ActiveInstant(requested, inflight, Objects.requireNonNull(completed)); + } + + public List<HoodieInstant> getPendingInstants() { + List<HoodieInstant> instants = new ArrayList<>(2); + if (this.requested != null) { + instants.add(this.requested); + } + if (this.inflight != null) { + instants.add(this.inflight); + } + return instants; + } + + public HoodieInstant getCompleted() { + return completed; + } + + public String getAction() { + return this.completed.getAction(); + } + + /** + * A COMPACTION action eventually becomes COMMIT when completed. Review Comment: something to think about is - whether we keep it "COMPACTION" with the new changes. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ActiveInstant.java: ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * A combination of instants covering action states: requested, inflight, completed. + */ +public class ActiveInstant implements Serializable, Comparable<ActiveInstant> { Review Comment: rename to "ActiveAction" since its really the instants that make up the action to completed state? ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java: ########## @@ -18,75 +18,125 @@ package org.apache.hudi.common.table.timeline; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; -import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.avro.model.HoodieArchivedInstant; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; -import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.ArchivedInstantReadSchemas; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; -import java.nio.charset.StandardCharsets; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Spliterator; -import java.util.Spliterators; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.StreamSupport; +import java.util.stream.Collectors; /** - * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the - * ActiveTimeline and the rest are in ArchivedTimeline. - * <p> - * </p> - * Instants are read from the archive file during initialization and never refreshed. To refresh, clients need to call - * reload() - * <p> - * </p> - * This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. + * Represents the Archived Timeline for the Hoodie table. + * + * <p>After several instants are accumulated as a batch on the active timeline, they would be archived as a parquet file into the archived timeline. + * In general the archived timeline is comprised with parquet files with LSM style file layout. Each new operation to the archived timeline generates + * a new snapshot version. Theoretically, there could be multiple snapshot versions on the archived timeline. + * + * <p><h2>The Archived Timeline Layout</h2> + * + * <pre> + * t111, t112 ... t120 ... -> + * \ / + * \ / + * | + * V + * t111_t120_0.parquet, t101_t110_0.parquet,... t11_t20_0.parquet L0 + * \ / + * \ / + * | + * V + * t11_t100_1.parquet L1 + * + * manifest_1, manifest_2, ... manifest_12 + * | | | + * V V V + * _version_1, _version_2, ... _version_12 + * </pre> + * + * <p><h2>The LSM Tree Compaction</h2> + * Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a large file in the next layer. + * We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per-file in L1, + * so 3000 instants could be represented as 3 parquets in L2, it is pretty fast if we use concurrent read. + * + * <p>The benchmark shows 1000 instants read cost about 10 ms. + * + * <p><h2>The Archiver & Reader Snapshot Isolation</h2> + * + * <p>In order to make snapshot isolation of the archived timeline write/read, we add two kinds of metadata files for the LSM tree version management: + * <ol> + * <li>Manifest file: Each new file in layer 0 or each compaction would generate a new manifest file, the manifest file records the valid file handles of the latest snapshot;</li> + * <li>Version file: A version file is generated right after a complete manifest file is formed.</li> + * </ol> + * + * <p><h2>The Reader Workflow</h2> + * <ul> + * <li>read the latest version;</li> Review Comment: can't we just read the latest manifest file? why do we want a separate version file? What does rocksdb layout look on storage? Can we learn from that. ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java: ########## @@ -18,75 +18,125 @@ package org.apache.hudi.common.table.timeline; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; -import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.avro.model.HoodieArchivedInstant; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; -import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.ArchivedInstantReadSchemas; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; -import java.nio.charset.StandardCharsets; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Spliterator; -import java.util.Spliterators; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.StreamSupport; +import java.util.stream.Collectors; /** - * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the - * ActiveTimeline and the rest are in ArchivedTimeline. - * <p> - * </p> - * Instants are read from the archive file during initialization and never refreshed. To refresh, clients need to call - * reload() - * <p> - * </p> - * This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. + * Represents the Archived Timeline for the Hoodie table. + * + * <p>After several instants are accumulated as a batch on the active timeline, they would be archived as a parquet file into the archived timeline. + * In general the archived timeline is comprised with parquet files with LSM style file layout. Each new operation to the archived timeline generates + * a new snapshot version. Theoretically, there could be multiple snapshot versions on the archived timeline. + * + * <p><h2>The Archived Timeline Layout</h2> + * + * <pre> + * t111, t112 ... t120 ... -> + * \ / + * \ / + * | + * V + * t111_t120_0.parquet, t101_t110_0.parquet,... t11_t20_0.parquet L0 + * \ / + * \ / + * | + * V + * t11_t100_1.parquet L1 + * + * manifest_1, manifest_2, ... manifest_12 + * | | | + * V V V + * _version_1, _version_2, ... _version_12 + * </pre> + * + * <p><h2>The LSM Tree Compaction</h2> + * Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a large file in the next layer. + * We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per-file in L1, + * so 3000 instants could be represented as 3 parquets in L2, it is pretty fast if we use concurrent read. + * + * <p>The benchmark shows 1000 instants read cost about 10 ms. + * + * <p><h2>The Archiver & Reader Snapshot Isolation</h2> + * + * <p>In order to make snapshot isolation of the archived timeline write/read, we add two kinds of metadata files for the LSM tree version management: + * <ol> + * <li>Manifest file: Each new file in layer 0 or each compaction would generate a new manifest file, the manifest file records the valid file handles of the latest snapshot;</li> + * <li>Version file: A version file is generated right after a complete manifest file is formed.</li> + * </ol> + * + * <p><h2>The Reader Workflow</h2> + * <ul> + * <li>read the latest version;</li> + * <li>read the manifest file for valid file handles;</li> + * <li>probably do a data skipping with the parquet file name max min timestamp.</li> + * </ul> + * + * <p><h2>The Legacy Files Cleaning and Read Retention</h2> + * Only triggers file cleaning after a valid compaction. Review Comment: I think we can use OCC here for concurrency control between LSM merge and writer? Even just taking a lock and letting write or lsm merge fail if there was sth concurrent would be ok? ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java: ########## @@ -18,75 +18,125 @@ package org.apache.hudi.common.table.timeline; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; -import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.avro.model.HoodieArchivedInstant; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; -import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.ArchivedInstantReadSchemas; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; -import java.nio.charset.StandardCharsets; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Spliterator; -import java.util.Spliterators; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.StreamSupport; +import java.util.stream.Collectors; /** - * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the - * ActiveTimeline and the rest are in ArchivedTimeline. - * <p> - * </p> - * Instants are read from the archive file during initialization and never refreshed. To refresh, clients need to call - * reload() - * <p> - * </p> - * This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. + * Represents the Archived Timeline for the Hoodie table. + * + * <p>After several instants are accumulated as a batch on the active timeline, they would be archived as a parquet file into the archived timeline. + * In general the archived timeline is comprised with parquet files with LSM style file layout. Each new operation to the archived timeline generates + * a new snapshot version. Theoretically, there could be multiple snapshot versions on the archived timeline. + * + * <p><h2>The Archived Timeline Layout</h2> + * + * <pre> + * t111, t112 ... t120 ... -> + * \ / + * \ / + * | + * V + * t111_t120_0.parquet, t101_t110_0.parquet,... t11_t20_0.parquet L0 + * \ / + * \ / + * | + * V + * t11_t100_1.parquet L1 + * + * manifest_1, manifest_2, ... manifest_12 + * | | | + * V V V + * _version_1, _version_2, ... _version_12 + * </pre> + * + * <p><h2>The LSM Tree Compaction</h2> + * Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a large file in the next layer. + * We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per-file in L1, Review Comment: might be good to think about a bound here and control how LSM merge is going to be based on that. I suggest . Not having more than 1GB files, to ensure merge process can run on lower end VMs/machines. ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java: ########## @@ -18,75 +18,125 @@ package org.apache.hudi.common.table.timeline; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; -import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.avro.model.HoodieArchivedInstant; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; -import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.ArchivedInstantReadSchemas; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; -import java.nio.charset.StandardCharsets; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Spliterator; -import java.util.Spliterators; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.StreamSupport; +import java.util.stream.Collectors; /** - * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the - * ActiveTimeline and the rest are in ArchivedTimeline. - * <p> - * </p> - * Instants are read from the archive file during initialization and never refreshed. To refresh, clients need to call - * reload() - * <p> - * </p> - * This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. + * Represents the Archived Timeline for the Hoodie table. + * + * <p>After several instants are accumulated as a batch on the active timeline, they would be archived as a parquet file into the archived timeline. + * In general the archived timeline is comprised with parquet files with LSM style file layout. Each new operation to the archived timeline generates + * a new snapshot version. Theoretically, there could be multiple snapshot versions on the archived timeline. + * + * <p><h2>The Archived Timeline Layout</h2> + * + * <pre> + * t111, t112 ... t120 ... -> + * \ / + * \ / + * | + * V + * t111_t120_0.parquet, t101_t110_0.parquet,... t11_t20_0.parquet L0 + * \ / + * \ / + * | + * V + * t11_t100_1.parquet L1 + * + * manifest_1, manifest_2, ... manifest_12 + * | | | + * V V V + * _version_1, _version_2, ... _version_12 + * </pre> + * + * <p><h2>The LSM Tree Compaction</h2> + * Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a large file in the next layer. + * We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per-file in L1, + * so 3000 instants could be represented as 3 parquets in L2, it is pretty fast if we use concurrent read. + * + * <p>The benchmark shows 1000 instants read cost about 10 ms. + * + * <p><h2>The Archiver & Reader Snapshot Isolation</h2> + * + * <p>In order to make snapshot isolation of the archived timeline write/read, we add two kinds of metadata files for the LSM tree version management: + * <ol> + * <li>Manifest file: Each new file in layer 0 or each compaction would generate a new manifest file, the manifest file records the valid file handles of the latest snapshot;</li> + * <li>Version file: A version file is generated right after a complete manifest file is formed.</li> Review Comment: what does this contain? Pointer to the latest manifest file? ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java: ########## @@ -18,75 +18,125 @@ package org.apache.hudi.common.table.timeline; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; -import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.avro.model.HoodieArchivedInstant; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; -import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.ArchivedInstantReadSchemas; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; -import java.nio.charset.StandardCharsets; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Spliterator; -import java.util.Spliterators; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.StreamSupport; +import java.util.stream.Collectors; /** - * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the - * ActiveTimeline and the rest are in ArchivedTimeline. - * <p> - * </p> - * Instants are read from the archive file during initialization and never refreshed. To refresh, clients need to call - * reload() - * <p> - * </p> - * This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. + * Represents the Archived Timeline for the Hoodie table. + * + * <p>After several instants are accumulated as a batch on the active timeline, they would be archived as a parquet file into the archived timeline. + * In general the archived timeline is comprised with parquet files with LSM style file layout. Each new operation to the archived timeline generates + * a new snapshot version. Theoretically, there could be multiple snapshot versions on the archived timeline. + * + * <p><h2>The Archived Timeline Layout</h2> + * + * <pre> + * t111, t112 ... t120 ... -> + * \ / + * \ / + * | + * V + * t111_t120_0.parquet, t101_t110_0.parquet,... t11_t20_0.parquet L0 + * \ / + * \ / + * | + * V + * t11_t100_1.parquet L1 + * + * manifest_1, manifest_2, ... manifest_12 + * | | | + * V V V + * _version_1, _version_2, ... _version_12 + * </pre> + * + * <p><h2>The LSM Tree Compaction</h2> + * Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a large file in the next layer. + * We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per-file in L1, + * so 3000 instants could be represented as 3 parquets in L2, it is pretty fast if we use concurrent read. + * + * <p>The benchmark shows 1000 instants read cost about 10 ms. + * + * <p><h2>The Archiver & Reader Snapshot Isolation</h2> + * + * <p>In order to make snapshot isolation of the archived timeline write/read, we add two kinds of metadata files for the LSM tree version management: + * <ol> + * <li>Manifest file: Each new file in layer 0 or each compaction would generate a new manifest file, the manifest file records the valid file handles of the latest snapshot;</li> Review Comment: i.e list of all files in the entire LSM. correct? ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java: ########## @@ -18,75 +18,125 @@ package org.apache.hudi.common.table.timeline; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; -import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.avro.model.HoodieArchivedInstant; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; -import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.ArchivedInstantReadSchemas; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; -import java.nio.charset.StandardCharsets; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Spliterator; -import java.util.Spliterators; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.StreamSupport; +import java.util.stream.Collectors; /** - * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the - * ActiveTimeline and the rest are in ArchivedTimeline. - * <p> - * </p> - * Instants are read from the archive file during initialization and never refreshed. To refresh, clients need to call - * reload() - * <p> - * </p> - * This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. + * Represents the Archived Timeline for the Hoodie table. + * + * <p>After several instants are accumulated as a batch on the active timeline, they would be archived as a parquet file into the archived timeline. + * In general the archived timeline is comprised with parquet files with LSM style file layout. Each new operation to the archived timeline generates + * a new snapshot version. Theoretically, there could be multiple snapshot versions on the archived timeline. + * + * <p><h2>The Archived Timeline Layout</h2> + * + * <pre> + * t111, t112 ... t120 ... -> + * \ / + * \ / + * | + * V + * t111_t120_0.parquet, t101_t110_0.parquet,... t11_t20_0.parquet L0 + * \ / + * \ / + * | + * V + * t11_t100_1.parquet L1 + * + * manifest_1, manifest_2, ... manifest_12 + * | | | + * V V V + * _version_1, _version_2, ... _version_12 + * </pre> + * + * <p><h2>The LSM Tree Compaction</h2> + * Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a large file in the next layer. + * We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per-file in L1, + * so 3000 instants could be represented as 3 parquets in L2, it is pretty fast if we use concurrent read. + * + * <p>The benchmark shows 1000 instants read cost about 10 ms. + * + * <p><h2>The Archiver & Reader Snapshot Isolation</h2> + * + * <p>In order to make snapshot isolation of the archived timeline write/read, we add two kinds of metadata files for the LSM tree version management: + * <ol> + * <li>Manifest file: Each new file in layer 0 or each compaction would generate a new manifest file, the manifest file records the valid file handles of the latest snapshot;</li> + * <li>Version file: A version file is generated right after a complete manifest file is formed.</li> + * </ol> + * + * <p><h2>The Reader Workflow</h2> + * <ul> + * <li>read the latest version;</li> + * <li>read the manifest file for valid file handles;</li> + * <li>probably do a data skipping with the parquet file name max min timestamp.</li> + * </ul> + * + * <p><h2>The Legacy Files Cleaning and Read Retention</h2> + * Only triggers file cleaning after a valid compaction. + * + * <p><h3>Clean Strategy</h3></p> Review Comment: Not following. are you talking about cleaning of the Hudi data table? ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java: ########## @@ -18,75 +18,125 @@ package org.apache.hudi.common.table.timeline; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; -import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.avro.model.HoodieArchivedInstant; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; -import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.ArchivedInstantReadSchemas; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; -import java.nio.charset.StandardCharsets; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Spliterator; -import java.util.Spliterators; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.StreamSupport; +import java.util.stream.Collectors; /** - * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the - * ActiveTimeline and the rest are in ArchivedTimeline. - * <p> - * </p> - * Instants are read from the archive file during initialization and never refreshed. To refresh, clients need to call - * reload() - * <p> - * </p> - * This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. + * Represents the Archived Timeline for the Hoodie table. + * + * <p>After several instants are accumulated as a batch on the active timeline, they would be archived as a parquet file into the archived timeline. + * In general the archived timeline is comprised with parquet files with LSM style file layout. Each new operation to the archived timeline generates + * a new snapshot version. Theoretically, there could be multiple snapshot versions on the archived timeline. + * + * <p><h2>The Archived Timeline Layout</h2> + * + * <pre> + * t111, t112 ... t120 ... -> + * \ / + * \ / + * | + * V + * t111_t120_0.parquet, t101_t110_0.parquet,... t11_t20_0.parquet L0 + * \ / + * \ / + * | + * V + * t11_t100_1.parquet L1 + * + * manifest_1, manifest_2, ... manifest_12 + * | | | + * V V V + * _version_1, _version_2, ... _version_12 + * </pre> + * + * <p><h2>The LSM Tree Compaction</h2> + * Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a large file in the next layer. + * We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per-file in L1, + * so 3000 instants could be represented as 3 parquets in L2, it is pretty fast if we use concurrent read. + * + * <p>The benchmark shows 1000 instants read cost about 10 ms. + * + * <p><h2>The Archiver & Reader Snapshot Isolation</h2> + * + * <p>In order to make snapshot isolation of the archived timeline write/read, we add two kinds of metadata files for the LSM tree version management: + * <ol> + * <li>Manifest file: Each new file in layer 0 or each compaction would generate a new manifest file, the manifest file records the valid file handles of the latest snapshot;</li> + * <li>Version file: A version file is generated right after a complete manifest file is formed.</li> + * </ol> + * + * <p><h2>The Reader Workflow</h2> + * <ul> + * <li>read the latest version;</li> + * <li>read the manifest file for valid file handles;</li> + * <li>probably do a data skipping with the parquet file name max min timestamp.</li> + * </ul> + * + * <p><h2>The Legacy Files Cleaning and Read Retention</h2> + * Only triggers file cleaning after a valid compaction. + * + * <p><h3>Clean Strategy</h3></p> + * Keeps only 3 valid snapshot versions for the reader, that means, a file is kept for at lest 3 archival trigger interval, for default configuration, it is 30 instants time span, + * which is far longer that the archived timeline loading time. + * + * <p><h3>Instants TTL</h3></p> + * The timeline reader only reads instants of last 7 days. We will skip the instants from archived timeline that are generated 1 week ago. + * + * <p><h2>Timeline Refresh</h2> Review Comment: fair. ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java: ########## @@ -18,75 +18,125 @@ package org.apache.hudi.common.table.timeline; -import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; -import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan; -import org.apache.hudi.common.fs.HoodieWrapperFileSystem; -import org.apache.hudi.common.model.HoodieLogFile; -import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.avro.model.HoodieArchivedInstant; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.log.HoodieLogFormat; -import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; -import org.apache.hudi.common.table.log.block.HoodieLogBlock; -import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.ArchivedInstantReadSchemas; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; -import java.nio.charset.StandardCharsets; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Spliterator; -import java.util.Spliterators; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.StreamSupport; +import java.util.stream.Collectors; /** - * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the - * ActiveTimeline and the rest are in ArchivedTimeline. - * <p> - * </p> - * Instants are read from the archive file during initialization and never refreshed. To refresh, clients need to call - * reload() - * <p> - * </p> - * This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized. + * Represents the Archived Timeline for the Hoodie table. + * + * <p>After several instants are accumulated as a batch on the active timeline, they would be archived as a parquet file into the archived timeline. + * In general the archived timeline is comprised with parquet files with LSM style file layout. Each new operation to the archived timeline generates + * a new snapshot version. Theoretically, there could be multiple snapshot versions on the archived timeline. + * + * <p><h2>The Archived Timeline Layout</h2> + * + * <pre> + * t111, t112 ... t120 ... -> + * \ / + * \ / + * | + * V + * t111_t120_0.parquet, t101_t110_0.parquet,... t11_t20_0.parquet L0 + * \ / + * \ / + * | + * V + * t11_t100_1.parquet L1 + * + * manifest_1, manifest_2, ... manifest_12 + * | | | + * V V V + * _version_1, _version_2, ... _version_12 + * </pre> + * + * <p><h2>The LSM Tree Compaction</h2> + * Use the universal compaction strategy, that is: when N(by default 10) number of parquet files exist in the current layer, they are merged and flush as a large file in the next layer. + * We have no limit for the layer number, assumes there are 10 instants for each file in L0, there could be 100 instants per-file in L1, + * so 3000 instants could be represented as 3 parquets in L2, it is pretty fast if we use concurrent read. + * + * <p>The benchmark shows 1000 instants read cost about 10 ms. + * + * <p><h2>The Archiver & Reader Snapshot Isolation</h2> + * + * <p>In order to make snapshot isolation of the archived timeline write/read, we add two kinds of metadata files for the LSM tree version management: + * <ol> + * <li>Manifest file: Each new file in layer 0 or each compaction would generate a new manifest file, the manifest file records the valid file handles of the latest snapshot;</li> + * <li>Version file: A version file is generated right after a complete manifest file is formed.</li> + * </ol> + * + * <p><h2>The Reader Workflow</h2> + * <ul> + * <li>read the latest version;</li> + * <li>read the manifest file for valid file handles;</li> + * <li>probably do a data skipping with the parquet file name max min timestamp.</li> + * </ul> + * + * <p><h2>The Legacy Files Cleaning and Read Retention</h2> + * Only triggers file cleaning after a valid compaction. + * + * <p><h3>Clean Strategy</h3></p> + * Keeps only 3 valid snapshot versions for the reader, that means, a file is kept for at lest 3 archival trigger interval, for default configuration, it is 30 instants time span, + * which is far longer that the archived timeline loading time. + * + * <p><h3>Instants TTL</h3></p> Review Comment: I'd prefer lazily loading it vs ignoring it completely. i.e the LSM performs well if reading is within 1 week in the past, but should be correct always. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org