vinothchandar commented on code in PR #9209:
URL: https://github.com/apache/hudi/pull/9209#discussion_r1300866432


##########
hudi-common/src/main/avro/HoodieLSMTimelineInstant.avsc:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+   "type":"record",
+   "name":"HoodieLSMTimelineInstant",
+   "namespace":"org.apache.hudi.avro.model",
+   "fields":[
+      {
+         "name":"instantTime",
+         "type":["null","string"],
+         "default": null
+      },
+      {
+         "name":"completionTime",
+         "type":["null","string"],
+         "default": null
+      },
+      {
+         "name":"action",
+         "type":["null","string"],
+         "default": null
+      },
+      {
+         "name":"metadata",
+         "type":["null", "bytes"],
+         "default": null
+      },
+      {
+         "name":"plan",
+         "type":["null", "bytes"],

Review Comment:
   I am thinking of the scenario where we want users to write SQL to query the 
timeline. if we do bytes, we need to probably provide udfs for converting from 
bytes to a plan? Follow up JIRA? (I think this is still better than nested 
schema, which can be expensive to write.)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ActiveAction.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;

Review Comment:
   can we make a org.apache.hudi.client.timeline package and move all these 
classes there. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LSMTimelineWriter.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.avro.model.HoodieLSMTimelineInstant;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLSMTimelineManifest;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.LSMTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * An archived timeline writer which organizes the files as an LSM tree.
+ */
+public class LSMTimelineWriter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LSMTimelineWriter.class);
+
+  public static final int FILE_LAYER_ZERO = 0;
+
+  public static final long MAX_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1000;
+
+  private final HoodieWriteConfig config;
+  private final HoodieTable<?, ?, ?, ?> table;
+  private final HoodieTableMetaClient metaClient;
+
+  private HoodieWriteConfig writeConfig;
+
+  private LSMTimelineWriter(HoodieWriteConfig config, HoodieTable<?, ?, ?, ?> 
table) {
+    this.config = config;
+    this.table = table;
+    this.metaClient = table.getMetaClient();
+  }
+
+  public static LSMTimelineWriter getInstance(HoodieWriteConfig config, 
HoodieTable<?, ?, ?, ?> table) {
+    return new LSMTimelineWriter(config, table);
+  }
+
+  public void write(List<ActiveAction> activeActions, Consumer<ActiveAction> 
preWriteCallback) throws HoodieCommitException {
+    ValidationUtils.checkArgument(!activeActions.isEmpty(), "The instant 
actions to write should not be empty");
+    Path filePath = new Path(metaClient.getArchivePath(),
+        newFileName(activeActions.get(0).getInstantTime(), 
activeActions.get(activeActions.size() - 1).getInstantTime(), FILE_LAYER_ZERO));
+    try (HoodieFileWriter writer = openWriter(filePath)) {
+      Schema wrapperSchema = HoodieLSMTimelineInstant.getClassSchema();
+      LOG.info("Writing schema " + wrapperSchema.toString());
+      for (ActiveAction activeAction : activeActions) {
+        try {
+          if (preWriteCallback != null) {
+            preWriteCallback.accept(activeAction);
+          }
+          // in local FS and HDFS, there could be empty completed instants due 
to crash.
+          final HoodieLSMTimelineInstant metaEntry = 
MetadataConversionUtils.createArchivedInstant(activeAction, metaClient);
+          writer.write(metaEntry.getInstantTime(), new 
HoodieAvroIndexedRecord(metaEntry), wrapperSchema);
+        } catch (Exception e) {
+          LOG.error("Failed to write instant: " + 
activeAction.getInstantTime(), e);
+          if (this.config.isFailOnTimelineArchivingEnabled()) {

Review Comment:
   move this handling out of this class?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java:
##########
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Tools used for migrating to new LSM tree style archived timeline.
+ */
+public class LegacyArchivedMetaEntryReader {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LegacyArchivedMetaEntryReader.class);
+
+  private static final Pattern ARCHIVE_FILE_PATTERN =
+      Pattern.compile("^\\.commits_\\.archive\\.([0-9]+).*");
+
+  public static final String MERGE_ARCHIVE_PLAN_NAME = "mergeArchivePlan";
+
+  private static final String ACTION_TYPE_KEY = "actionType";
+  private static final String ACTION_STATE = "actionState";
+  private static final String STATE_TRANSITION_TIME = "stateTransitionTime";
+
+  private final HoodieTableMetaClient metaClient;
+
+  public LegacyArchivedMetaEntryReader(HoodieTableMetaClient metaClient) {
+    this.metaClient = metaClient;
+  }
+
+  public ClosableIterator<ActiveAction> getActiveActionsIterator() {
+    return loadInstants(null);
+  }
+
+  /**
+   * Reads the avro record for instant and details.
+   */
+  private Pair<HoodieInstant, Option<byte[]>> readInstant(GenericRecord 
record) {
+    final String instantTime = 
record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
+    final String action = record.get(ACTION_TYPE_KEY).toString();
+    final String stateTransitionTime = (String) 
record.get(STATE_TRANSITION_TIME);
+    final Option<byte[]> details = getMetadataKey(action).map(key -> {
+      Object actionData = record.get(key);
+      if (actionData != null) {
+        if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
+          return HoodieAvroUtils.indexedRecordToBytes((IndexedRecord) 
actionData);
+        } else {
+          return actionData.toString().getBytes(StandardCharsets.UTF_8);
+        }
+      }
+      return null;
+    });
+    HoodieInstant instant = new 
HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), 
action,
+        instantTime, stateTransitionTime);
+    return Pair.of(instant,details);
+  }
+
+  @Nonnull
+  private Option<String> getMetadataKey(String action) {
+    switch (action) {
+      case HoodieTimeline.CLEAN_ACTION:
+        return Option.of("hoodieCleanMetadata");

Review Comment:
   any existing constants we can use for this? else ignore.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LSMTimelineWriter.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.avro.model.HoodieLSMTimelineInstant;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLSMTimelineManifest;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.LSMTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * An archived timeline writer which organizes the files as an LSM tree.
+ */
+public class LSMTimelineWriter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LSMTimelineWriter.class);
+
+  public static final int FILE_LAYER_ZERO = 0;
+
+  public static final long MAX_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1000;
+
+  private final HoodieWriteConfig config;
+  private final HoodieTable<?, ?, ?, ?> table;
+  private final HoodieTableMetaClient metaClient;
+
+  private HoodieWriteConfig writeConfig;
+
+  private LSMTimelineWriter(HoodieWriteConfig config, HoodieTable<?, ?, ?, ?> 
table) {
+    this.config = config;
+    this.table = table;
+    this.metaClient = table.getMetaClient();
+  }
+
+  public static LSMTimelineWriter getInstance(HoodieWriteConfig config, 
HoodieTable<?, ?, ?, ?> table) {
+    return new LSMTimelineWriter(config, table);
+  }
+
+  public void write(List<ActiveAction> activeActions, Consumer<ActiveAction> 
preWriteCallback) throws HoodieCommitException {
+    ValidationUtils.checkArgument(!activeActions.isEmpty(), "The instant 
actions to write should not be empty");
+    Path filePath = new Path(metaClient.getArchivePath(),
+        newFileName(activeActions.get(0).getInstantTime(), 
activeActions.get(activeActions.size() - 1).getInstantTime(), FILE_LAYER_ZERO));
+    try (HoodieFileWriter writer = openWriter(filePath)) {
+      Schema wrapperSchema = HoodieLSMTimelineInstant.getClassSchema();
+      LOG.info("Writing schema " + wrapperSchema.toString());
+      for (ActiveAction activeAction : activeActions) {
+        try {
+          if (preWriteCallback != null) {
+            preWriteCallback.accept(activeAction);
+          }
+          // in local FS and HDFS, there could be empty completed instants due 
to crash.
+          final HoodieLSMTimelineInstant metaEntry = 
MetadataConversionUtils.createArchivedInstant(activeAction, metaClient);

Review Comment:
   rename. remove any reference to "Archived"



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LSMTimelineWriter.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.avro.model.HoodieLSMTimelineInstant;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLSMTimelineManifest;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.LSMTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * An archived timeline writer which organizes the files as an LSM tree.

Review Comment:
   remove "archived"



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LSMTimelineWriter.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.avro.model.HoodieLSMTimelineInstant;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLSMTimelineManifest;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.LSMTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * An archived timeline writer which organizes the files as an LSM tree.
+ */
+public class LSMTimelineWriter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LSMTimelineWriter.class);
+
+  public static final int FILE_LAYER_ZERO = 0;
+
+  public static final long MAX_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1000;
+
+  private final HoodieWriteConfig config;
+  private final HoodieTable<?, ?, ?, ?> table;
+  private final HoodieTableMetaClient metaClient;
+
+  private HoodieWriteConfig writeConfig;
+
+  private LSMTimelineWriter(HoodieWriteConfig config, HoodieTable<?, ?, ?, ?> 
table) {
+    this.config = config;
+    this.table = table;
+    this.metaClient = table.getMetaClient();
+  }
+
+  public static LSMTimelineWriter getInstance(HoodieWriteConfig config, 
HoodieTable<?, ?, ?, ?> table) {
+    return new LSMTimelineWriter(config, table);
+  }
+
+  public void write(List<ActiveAction> activeActions, Consumer<ActiveAction> 
preWriteCallback) throws HoodieCommitException {
+    ValidationUtils.checkArgument(!activeActions.isEmpty(), "The instant 
actions to write should not be empty");
+    Path filePath = new Path(metaClient.getArchivePath(),
+        newFileName(activeActions.get(0).getInstantTime(), 
activeActions.get(activeActions.size() - 1).getInstantTime(), FILE_LAYER_ZERO));
+    try (HoodieFileWriter writer = openWriter(filePath)) {
+      Schema wrapperSchema = HoodieLSMTimelineInstant.getClassSchema();
+      LOG.info("Writing schema " + wrapperSchema.toString());
+      for (ActiveAction activeAction : activeActions) {
+        try {
+          if (preWriteCallback != null) {

Review Comment:
   can we try to use Option instead of `null` as sentinels?



##########
hudi-common/src/main/avro/HoodieLSMTimelineInstant.avsc:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+   "type":"record",
+   "name":"HoodieLSMTimelineInstant",
+   "namespace":"org.apache.hudi.avro.model",
+   "fields":[

Review Comment:
   lets add a version field to each record, so we can evolve as we go if needed



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LSMTimelineWriter.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.avro.model.HoodieLSMTimelineInstant;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLSMTimelineManifest;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.LSMTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * An archived timeline writer which organizes the files as an LSM tree.
+ */
+public class LSMTimelineWriter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LSMTimelineWriter.class);
+
+  public static final int FILE_LAYER_ZERO = 0;
+
+  public static final long MAX_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1000;
+
+  private final HoodieWriteConfig config;
+  private final HoodieTable<?, ?, ?, ?> table;
+  private final HoodieTableMetaClient metaClient;
+
+  private HoodieWriteConfig writeConfig;
+
+  private LSMTimelineWriter(HoodieWriteConfig config, HoodieTable<?, ?, ?, ?> 
table) {
+    this.config = config;
+    this.table = table;
+    this.metaClient = table.getMetaClient();
+  }
+
+  public static LSMTimelineWriter getInstance(HoodieWriteConfig config, 
HoodieTable<?, ?, ?, ?> table) {
+    return new LSMTimelineWriter(config, table);
+  }
+
+  public void write(List<ActiveAction> activeActions, Consumer<ActiveAction> 
preWriteCallback) throws HoodieCommitException {
+    ValidationUtils.checkArgument(!activeActions.isEmpty(), "The instant 
actions to write should not be empty");
+    Path filePath = new Path(metaClient.getArchivePath(),
+        newFileName(activeActions.get(0).getInstantTime(), 
activeActions.get(activeActions.size() - 1).getInstantTime(), FILE_LAYER_ZERO));
+    try (HoodieFileWriter writer = openWriter(filePath)) {
+      Schema wrapperSchema = HoodieLSMTimelineInstant.getClassSchema();
+      LOG.info("Writing schema " + wrapperSchema.toString());
+      for (ActiveAction activeAction : activeActions) {
+        try {
+          if (preWriteCallback != null) {
+            preWriteCallback.accept(activeAction);
+          }
+          // in local FS and HDFS, there could be empty completed instants due 
to crash.
+          final HoodieLSMTimelineInstant metaEntry = 
MetadataConversionUtils.createArchivedInstant(activeAction, metaClient);
+          writer.write(metaEntry.getInstantTime(), new 
HoodieAvroIndexedRecord(metaEntry), wrapperSchema);
+        } catch (Exception e) {
+          LOG.error("Failed to write instant: " + 
activeAction.getInstantTime(), e);
+          if (this.config.isFailOnTimelineArchivingEnabled()) {
+            throw e;
+          }
+        }
+      }
+      updateManifest(filePath.getName());
+    } catch (Exception e) {
+      throw new HoodieCommitException("Failed to write commits", e);
+    }
+  }
+
+  public void updateManifest(String fileToAdd) throws IOException {
+    // 1. read the latest manifest version file;
+    // 2. read the latest manifest file for valid files;
+    // 3. add this new file to the existing file list from step2.
+    int latestVersion = LSMTimeline.latestSnapshotVersion(metaClient);
+    HoodieLSMTimelineManifest latestManifest = 
LSMTimeline.latestSnapshotManifest(metaClient, latestVersion);
+    HoodieLSMTimelineManifest newManifest = latestManifest.copy();
+    newManifest.addFile(getFileEntry(fileToAdd));
+    createManifestFile(newManifest, latestVersion);
+  }
+
+  public void updateManifest(List<String> filesToRemove, String fileToAdd) 
throws IOException {
+    // 1. read the latest manifest version file;

Review Comment:
   multi line comment or move to java doc?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LSMTimelineWriter.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.avro.model.HoodieLSMTimelineInstant;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLSMTimelineManifest;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.LSMTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * An archived timeline writer which organizes the files as an LSM tree.
+ */
+public class LSMTimelineWriter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LSMTimelineWriter.class);
+
+  public static final int FILE_LAYER_ZERO = 0;
+
+  public static final long MAX_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1000;
+
+  private final HoodieWriteConfig config;
+  private final HoodieTable<?, ?, ?, ?> table;
+  private final HoodieTableMetaClient metaClient;
+
+  private HoodieWriteConfig writeConfig;
+
+  private LSMTimelineWriter(HoodieWriteConfig config, HoodieTable<?, ?, ?, ?> 
table) {
+    this.config = config;
+    this.table = table;
+    this.metaClient = table.getMetaClient();
+  }
+
+  public static LSMTimelineWriter getInstance(HoodieWriteConfig config, 
HoodieTable<?, ?, ?, ?> table) {
+    return new LSMTimelineWriter(config, table);
+  }
+
+  public void write(List<ActiveAction> activeActions, Consumer<ActiveAction> 
preWriteCallback) throws HoodieCommitException {
+    ValidationUtils.checkArgument(!activeActions.isEmpty(), "The instant 
actions to write should not be empty");
+    Path filePath = new Path(metaClient.getArchivePath(),
+        newFileName(activeActions.get(0).getInstantTime(), 
activeActions.get(activeActions.size() - 1).getInstantTime(), FILE_LAYER_ZERO));
+    try (HoodieFileWriter writer = openWriter(filePath)) {
+      Schema wrapperSchema = HoodieLSMTimelineInstant.getClassSchema();
+      LOG.info("Writing schema " + wrapperSchema.toString());
+      for (ActiveAction activeAction : activeActions) {
+        try {
+          if (preWriteCallback != null) {
+            preWriteCallback.accept(activeAction);
+          }
+          // in local FS and HDFS, there could be empty completed instants due 
to crash.
+          final HoodieLSMTimelineInstant metaEntry = 
MetadataConversionUtils.createArchivedInstant(activeAction, metaClient);
+          writer.write(metaEntry.getInstantTime(), new 
HoodieAvroIndexedRecord(metaEntry), wrapperSchema);
+        } catch (Exception e) {
+          LOG.error("Failed to write instant: " + 
activeAction.getInstantTime(), e);
+          if (this.config.isFailOnTimelineArchivingEnabled()) {
+            throw e;
+          }
+        }
+      }
+      updateManifest(filePath.getName());
+    } catch (Exception e) {
+      throw new HoodieCommitException("Failed to write commits", e);
+    }
+  }
+
+  public void updateManifest(String fileToAdd) throws IOException {
+    // 1. read the latest manifest version file;
+    // 2. read the latest manifest file for valid files;
+    // 3. add this new file to the existing file list from step2.
+    int latestVersion = LSMTimeline.latestSnapshotVersion(metaClient);
+    HoodieLSMTimelineManifest latestManifest = 
LSMTimeline.latestSnapshotManifest(metaClient, latestVersion);
+    HoodieLSMTimelineManifest newManifest = latestManifest.copy();
+    newManifest.addFile(getFileEntry(fileToAdd));
+    createManifestFile(newManifest, latestVersion);
+  }
+
+  public void updateManifest(List<String> filesToRemove, String fileToAdd) 
throws IOException {
+    // 1. read the latest manifest version file;
+    // 2. read the latest manifest file for valid files;
+    // 3. remove files to the existing file list from step2;
+    // 4. add this new file to the existing file list from step2.
+    int latestVersion = LSMTimeline.latestSnapshotVersion(metaClient);
+    HoodieLSMTimelineManifest latestManifest = 
LSMTimeline.latestSnapshotManifest(metaClient, latestVersion);
+    HoodieLSMTimelineManifest newManifest = latestManifest.copy(filesToRemove);
+    newManifest.addFile(getFileEntry(fileToAdd));
+    createManifestFile(newManifest, latestVersion);
+  }
+
+  private void createManifestFile(HoodieLSMTimelineManifest manifest, int 
currentVersion) throws IOException {
+    byte[] content = manifest.toJsonString().getBytes(StandardCharsets.UTF_8);
+    // version starts from 1 and increases monotonically
+    int newVersion = currentVersion < 0 ? 1 : currentVersion + 1;
+    // create manifest file
+    final Path manifestFilePath = LSMTimeline.getManifestFilePath(metaClient, 
newVersion);
+    metaClient.getFs().createImmutableFileInPath(manifestFilePath, 
Option.of(content));
+    // update version file
+    updateVersionFile(newVersion);
+  }
+
+  private void updateVersionFile(int newVersion) throws IOException {
+    byte[] content = 
(String.valueOf(newVersion)).getBytes(StandardCharsets.UTF_8);
+    final Path versionFilePath = LSMTimeline.getVersionFilePath(metaClient);
+    metaClient.getFs().delete(versionFilePath, false);
+    metaClient.getFs().createImmutableFileInPath(versionFilePath, 
Option.of(content));
+  }
+
+  /**
+   * Compacts the small archive files.

Review Comment:
   please look for all references of "archive" and cleanup.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##########
@@ -424,9 +159,7 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() 
throws IOException {
                   LESSER_THAN, 
oldestPendingInstant.get().getTimestamp())).findFirst());
       // Check if the completed instant is higher than the oldest inflight 
instant
       // in that case update the oldestCommitToRetain to oldestInflight commit 
time.
-      if (!completedCommitBeforeOldestPendingInstant.isPresent()
-          || 
HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(),
-          LESSER_THAN, 
completedCommitBeforeOldestPendingInstant.get().getTimestamp())) {
+      if (!completedCommitBeforeOldestPendingInstant.isPresent()) {

Review Comment:
   why do we need this change? this may break sth. please revert if unnecessary 
for this change. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/LSMTimeline.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.common.model.HoodieLSMTimelineManifest;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ArchivedInstantReadSchemas;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Represents the LSM Timeline for the Hoodie table.
+ *
+ * <p>After several instants are accumulated as a batch on the active 
timeline, they would be archived as a parquet file into the archived timeline.
+ * In general the archived timeline is comprised with parquet files with LSM 
style file layout. Each new operation to the archived timeline generates
+ * a new snapshot version. Theoretically, there could be multiple snapshot 
versions on the archived timeline.
+ *
+ * <p><h2>The Archived Timeline Layout</h2>
+ *
+ * <pre>
+ *   t111, t112 ... t120 ... ->
+ *     \              /
+ *        \        /
+ *            |
+ *            V
+ *   t111_t120_0.parquet, t101_t110_0.parquet,...  t11_t20_0.parquet    L0
+ *                                  \                    /
+ *                                     \              /
+ *                                            |
+ *                                            V
+ *                                    t11_t100_1.parquet                L1
+ *
+ *      manifest_1, manifest_2, ... manifest_12
+ *                                      |
+ *                                      V
+ *                                  _version_
+ * </pre>
+ *
+ * <p><h2>The LSM Tree Compaction</h2>
+ * Use the universal compaction strategy, that is: when N(by default 10) 
number of parquet files exist in the current layer, they are merged and flush 
as a compacted file in the next layer.
+ * We have no limit for the layer number, assumes there are 10 instants for 
each file in L0, there could be 100 instants per file in L1,
+ * so 3000 instants could be represented as 3 parquets in L2, it is pretty 
fast if we use concurrent read.
+ *
+ * <p>The benchmark shows 1000 instants read cost about 10 ms.
+ *
+ * <p><h2>The Archiver & Reader Snapshot Isolation</h2>
+ *
+ * <p>In order to make snapshot isolation of the archived timeline write/read, 
we add two kinds of metadata files for the LSM tree version management:
+ * <ol>
+ *   <li>Manifest file: Each new file in layer 0 or each compaction would 
generate a new manifest file, the manifest file records the valid file handles 
of the latest snapshot;</li>
+ *   <li>Version file: A version file is generated right after a complete 
manifest file is formed.</li>
+ * </ol>
+ *
+ * <p><h2>The Reader Workflow</h2>
+ * <ul>
+ *   <li>read the latest version;</li>
+ *   <li>read the manifest file for valid file handles;</li>
+ *   <li>probably do a data skipping with the parquet file name max min 
timestamp.</li>
+ * </ul>
+ *
+ * <p><h2>The Legacy Files Cleaning and Read Retention</h2>
+ * Only triggers file cleaning after a valid compaction.
+ *
+ * <p><h3>Clean Strategy</h3></p>
+ * Keeps only 3 valid snapshot versions for the reader, that means, a file is 
kept for at lest 3 archival trigger interval, for default configuration, it is 
30 instants time span,
+ * which is far longer that the archived timeline loading time.
+ *
+ * <p><h3>Instants TTL</h3></p>
+ * The timeline reader only reads instants of last limited days. We will by 
default skip the instants from archived timeline that are generated long time 
ago.
+ */
+public class LSMTimeline {
+  private static final Logger LOG = LoggerFactory.getLogger(LSMTimeline.class);
+
+  private static final String VERSION_FILE_NAME = "_version_";    // _version_
+  private static final String MANIFEST_FILE_PREFIX = "manifest_"; // 
manifest_[N]
+
+  private static final String TEMP_FILE_SUFFIX = ".tmp";
+
+  private static final Pattern ARCHIVE_FILE_PATTERN =
+      Pattern.compile("^(\\d+)_(\\d+)_(\\d)\\.parquet");
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+  public static Schema getReadSchema(HoodieArchivedTimeline.LoadMode loadMode) 
{
+    switch (loadMode) {
+      case SLIM:
+        return ArchivedInstantReadSchemas.TIMELINE_LSM_SLIM_READ_SCHEMA;
+      case METADATA:
+        return 
ArchivedInstantReadSchemas.TIMELINE_LSM_READ_SCHEMA_WITH_METADATA;
+      case PLAN:
+        return ArchivedInstantReadSchemas.TIMELINE_LSM_READ_SCHEMA_WITH_PLAN;
+      default:
+        throw new AssertionError("Unexpected");
+    }
+  }
+
+  /**
+   * Returns whether the given file is located in the filter.
+   */
+  public static boolean isFileInRange(HoodieArchivedTimeline.TimeRangeFilter 
filter, String fileName) {
+    String minInstant = getMinInstantTime(fileName);
+    String maxInstant = getMaxInstantTime(fileName);
+    return filter.isInRange(minInstant) || filter.isInRange(maxInstant);
+  }
+
+  /**
+   * Returns the latest snapshot version.
+   */
+  public static int latestSnapshotVersion(HoodieTableMetaClient metaClient) 
throws IOException {
+    Path versionFilePath = getVersionFilePath(metaClient);
+    if (metaClient.getFs().exists(versionFilePath)) {
+      try {
+        Option<byte[]> content = 
FileIOUtils.readDataFromPath(metaClient.getFs(), versionFilePath);
+        if (content.isPresent()) {
+          return Integer.parseInt(new String(content.get(), 
StandardCharsets.UTF_8));
+        }
+      } catch (Exception e) {
+        // fallback to manifest file listing.
+        LOG.warn("Error reading version file {}", versionFilePath, e);
+      }
+    }
+    return 
allSnapshotVersions(metaClient).stream().max(Integer::compareTo).orElse(-1);
+  }
+
+  /**
+   * Returns all the valid snapshot versions.
+   */
+  public static List<Integer> allSnapshotVersions(HoodieTableMetaClient 
metaClient) throws IOException {
+    return Arrays.stream(metaClient.getFs().listStatus(new 
Path(metaClient.getArchivePath()), getManifestFilePathFilter()))
+        .map(fileStatus -> fileStatus.getPath().getName())
+        .map(LSMTimeline::getManifestVersion)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Returns the latest snapshot metadata files.
+   */
+  public static HoodieLSMTimelineManifest 
latestSnapshotManifest(HoodieTableMetaClient metaClient) throws IOException {
+    int latestVersion = latestSnapshotVersion(metaClient);
+    return latestSnapshotManifest(metaClient, latestVersion);
+  }
+
+  /**
+   * Reads the file list from the manifest file for the latest snapshot.
+   */
+  public static HoodieLSMTimelineManifest 
latestSnapshotManifest(HoodieTableMetaClient metaClient, int latestVersion) {
+    if (latestVersion < 0) {
+      // there is no valid snapshot of the timeline.
+      return HoodieLSMTimelineManifest.EMPTY;
+    }
+    // read and deserialize the valid files.
+    byte[] content = FileIOUtils.readDataFromPath(metaClient.getFs(), 
getManifestFilePath(metaClient, latestVersion)).get();
+    try {
+      return HoodieLSMTimelineManifest.fromJsonString(new String(content, 
StandardCharsets.UTF_8), HoodieLSMTimelineManifest.class);
+    } catch (Exception e) {
+      throw new HoodieException("Error deserializing manifest entries", e);
+    }
+  }
+
+  /**
+   * Returns the full manifest file path with given version number.
+   */
+  public static Path getManifestFilePath(HoodieTableMetaClient metaClient, int 
snapshotVersion) {
+    return new Path(metaClient.getArchivePath(), MANIFEST_FILE_PREFIX + 
snapshotVersion);
+  }
+
+  /**
+   * Returns the full version file path with given version number.
+   */
+  public static Path getVersionFilePath(HoodieTableMetaClient metaClient) {
+    return new Path(metaClient.getArchivePath(), VERSION_FILE_NAME);
+  }
+
+  /**
+   * List all the parquet manifest files.
+   */
+  public static FileStatus[] listAllManifestFiles(HoodieTableMetaClient 
metaClient) throws IOException {
+    return metaClient.getFs().listStatus(new 
Path(metaClient.getArchivePath()), getManifestFilePathFilter());
+  }
+
+  /**
+   * List all the parquet metadata files.
+   */
+  public static FileStatus[] listAllMetaFiles(HoodieTableMetaClient 
metaClient) throws IOException {
+    return metaClient.getFs().globStatus(
+        new Path(metaClient.getArchivePath() + "/*.parquet"));
+  }
+
+  /**
+   * Parse the snapshot version from the manifest file name.
+   */
+  public static int getManifestVersion(String fileName) {
+    return Integer.parseInt(fileName.split("_")[1]);
+  }
+
+  /**
+   * Parse the layer number from the file name.
+   */
+  public static int getFileLayer(String fileName) {
+    try {
+      Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(fileName);
+      if (fileMatcher.matches()) {
+        return Integer.parseInt(fileMatcher.group(3));
+      }
+    } catch (NumberFormatException e) {
+      // log and ignore any format warnings
+      LOG.warn("error getting file layout for archived file: " + fileName);
+    }
+
+    // return default value in case of any errors
+    return 0;
+  }
+
+  /**
+   * Parse the minimum instant time from the file name.
+   */
+  public static String getMinInstantTime(String fileName) {
+    Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(fileName);
+    if (fileMatcher.matches()) {
+      return fileMatcher.group(1);
+    } else {
+      throw new HoodieException("Unexpected archival file name: " + fileName);
+    }
+  }
+
+  /**
+   * Parse the maximum instant time from the file name.
+   */
+  public static String getMaxInstantTime(String fileName) {

Review Comment:
   do these methods have UTs?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LSMTimelineWriter.java:
##########
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.avro.model.HoodieLSMTimelineInstant;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLSMTimelineManifest;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.LSMTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * An archived timeline writer which organizes the files as an LSM tree.
+ */
+public class LSMTimelineWriter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LSMTimelineWriter.class);
+
+  public static final int FILE_LAYER_ZERO = 0;
+
+  public static final long MAX_FILE_SIZE_IN_BYTES = 1024 * 1024 * 1000;
+
+  private final HoodieWriteConfig config;
+  private final HoodieTable<?, ?, ?, ?> table;
+  private final HoodieTableMetaClient metaClient;
+
+  private HoodieWriteConfig writeConfig;
+
+  private LSMTimelineWriter(HoodieWriteConfig config, HoodieTable<?, ?, ?, ?> 
table) {
+    this.config = config;
+    this.table = table;
+    this.metaClient = table.getMetaClient();
+  }
+
+  public static LSMTimelineWriter getInstance(HoodieWriteConfig config, 
HoodieTable<?, ?, ?, ?> table) {
+    return new LSMTimelineWriter(config, table);
+  }
+
+  public void write(List<ActiveAction> activeActions, Consumer<ActiveAction> 
preWriteCallback) throws HoodieCommitException {
+    ValidationUtils.checkArgument(!activeActions.isEmpty(), "The instant 
actions to write should not be empty");
+    Path filePath = new Path(metaClient.getArchivePath(),
+        newFileName(activeActions.get(0).getInstantTime(), 
activeActions.get(activeActions.size() - 1).getInstantTime(), FILE_LAYER_ZERO));
+    try (HoodieFileWriter writer = openWriter(filePath)) {
+      Schema wrapperSchema = HoodieLSMTimelineInstant.getClassSchema();
+      LOG.info("Writing schema " + wrapperSchema.toString());
+      for (ActiveAction activeAction : activeActions) {
+        try {
+          if (preWriteCallback != null) {
+            preWriteCallback.accept(activeAction);
+          }
+          // in local FS and HDFS, there could be empty completed instants due 
to crash.
+          final HoodieLSMTimelineInstant metaEntry = 
MetadataConversionUtils.createArchivedInstant(activeAction, metaClient);
+          writer.write(metaEntry.getInstantTime(), new 
HoodieAvroIndexedRecord(metaEntry), wrapperSchema);
+        } catch (Exception e) {
+          LOG.error("Failed to write instant: " + 
activeAction.getInstantTime(), e);
+          if (this.config.isFailOnTimelineArchivingEnabled()) {
+            throw e;
+          }
+        }
+      }
+      updateManifest(filePath.getName());
+    } catch (Exception e) {
+      throw new HoodieCommitException("Failed to write commits", e);
+    }
+  }
+
+  public void updateManifest(String fileToAdd) throws IOException {
+    // 1. read the latest manifest version file;
+    // 2. read the latest manifest file for valid files;
+    // 3. add this new file to the existing file list from step2.
+    int latestVersion = LSMTimeline.latestSnapshotVersion(metaClient);
+    HoodieLSMTimelineManifest latestManifest = 
LSMTimeline.latestSnapshotManifest(metaClient, latestVersion);
+    HoodieLSMTimelineManifest newManifest = latestManifest.copy();
+    newManifest.addFile(getFileEntry(fileToAdd));
+    createManifestFile(newManifest, latestVersion);
+  }
+
+  public void updateManifest(List<String> filesToRemove, String fileToAdd) 
throws IOException {
+    // 1. read the latest manifest version file;
+    // 2. read the latest manifest file for valid files;
+    // 3. remove files to the existing file list from step2;
+    // 4. add this new file to the existing file list from step2.
+    int latestVersion = LSMTimeline.latestSnapshotVersion(metaClient);
+    HoodieLSMTimelineManifest latestManifest = 
LSMTimeline.latestSnapshotManifest(metaClient, latestVersion);
+    HoodieLSMTimelineManifest newManifest = latestManifest.copy(filesToRemove);
+    newManifest.addFile(getFileEntry(fileToAdd));
+    createManifestFile(newManifest, latestVersion);
+  }
+
+  private void createManifestFile(HoodieLSMTimelineManifest manifest, int 
currentVersion) throws IOException {
+    byte[] content = manifest.toJsonString().getBytes(StandardCharsets.UTF_8);
+    // version starts from 1 and increases monotonically
+    int newVersion = currentVersion < 0 ? 1 : currentVersion + 1;
+    // create manifest file
+    final Path manifestFilePath = LSMTimeline.getManifestFilePath(metaClient, 
newVersion);
+    metaClient.getFs().createImmutableFileInPath(manifestFilePath, 
Option.of(content));
+    // update version file
+    updateVersionFile(newVersion);
+  }
+
+  private void updateVersionFile(int newVersion) throws IOException {
+    byte[] content = 
(String.valueOf(newVersion)).getBytes(StandardCharsets.UTF_8);
+    final Path versionFilePath = LSMTimeline.getVersionFilePath(metaClient);
+    metaClient.getFs().delete(versionFilePath, false);
+    metaClient.getFs().createImmutableFileInPath(versionFilePath, 
Option.of(content));
+  }
+
+  /**
+   * Compacts the small archive files.
+   *
+   * <p>The parquet naming convention is:
+   *
+   * <pre>${min_instant}_${max_instant}_${level}.parquet</pre>
+   *
+   * <p>The 'min_instant' and 'max_instant' represent the instant time range 
of the parquet file.
+   * The 'level' represents the number of the level where the file is located, 
currently we
+   * have no limit for the number of layers.
+   *
+   * <p>These archive parquet files composite as an LSM tree layout, one 
parquet file contains
+   * a consecutive timestamp instant metadata entries. Different parquet files 
may have
+   * overlapping with the instant time ranges.
+   *
+   * <pre>
+   *   t1_t2_0.parquet, t3_t4_0.parquet, ... t5_t6_0.parquet       L0 layer
+   *                          \            /
+   *                             \     /
+   *                                |
+   *                                V
+   *                          t3_t6_1.parquet                      L1 layer
+   * </pre>
+   *
+   * <p>Compaction and cleaning: once the files number exceed a threshold(now 
constant 10) N,
+   * the oldest N files are then replaced with a compacted file in the next 
layer.
+   * A cleaning action is triggered right after the compaction.
+   *
+   * @param context HoodieEngineContext
+   */
+  @VisibleForTesting
+  public void compactAndClean(HoodieEngineContext context) throws IOException {
+    // 1. List all the latest snapshot files
+    HoodieLSMTimelineManifest latestManifest = 
LSMTimeline.latestSnapshotManifest(metaClient);
+    int layer = 0;
+    // 2. triggers the compaction for L0
+    Option<String> compactedFileName = doCompact(latestManifest, layer);
+    while (compactedFileName.isPresent()) {
+      // 3. once a compaction had been executed for the current layer,
+      // continues to trigger compaction for the next layer.
+      latestManifest.addFile(getFileEntry(compactedFileName.get()));
+      compactedFileName = doCompact(latestManifest, ++layer);
+    }
+
+    // cleaning
+    clean(context, layer);
+  }
+
+  private Option<String> doCompact(HoodieLSMTimelineManifest manifest, int 
layer) throws IOException {
+    // 1. list all the files that belong to current layer
+    List<HoodieLSMTimelineManifest.LSMFileEntry> files = manifest.getFiles()
+        .stream().filter(file -> 
LSMTimeline.isFileFromLayer(file.getFileName(), 
layer)).collect(Collectors.toList());
+
+    int compactionBatchSize = config.getTimelineCompactionBatchSize();
+
+    if (files.size() >= compactionBatchSize) {
+      // 2. sort files by min instant time (implies ascending chronological 
order)
+      files.sort(HoodieLSMTimelineManifest.LSMFileEntry::compareTo);
+      List<String> candidateFiles = getCandidateFiles(files, 
compactionBatchSize);
+      if (candidateFiles.size() < 2) {
+        // the file is too large to compact, returns early.
+        return Option.empty();
+      }
+      String compactedFileName = compactedFileName(candidateFiles);
+
+      // 3. compaction
+      compactFiles(candidateFiles, compactedFileName);
+      // 4. update the manifest file
+      updateManifest(candidateFiles, compactedFileName);
+      LOG.info("Finishes compaction of archive files: " + candidateFiles);
+      return Option.of(compactedFileName);
+    }
+    return Option.empty();
+  }
+
+  public void compactFiles(List<String> candidateFiles, String 
compactedFileName) {
+    LOG.info("Starting to merge small archive files.");
+    try (HoodieFileWriter writer = openWriter(new 
Path(metaClient.getArchivePath(), compactedFileName))) {
+      for (String fileName : candidateFiles) {
+        // Read the archived file
+        try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+            .getFileReader(metaClient.getHadoopConf(), new 
Path(metaClient.getArchivePath(), fileName))) {
+          // Read the meta entry
+          try (ClosableIterator<IndexedRecord> iterator = 
reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(), 
HoodieLSMTimelineInstant.getClassSchema())) {
+            while (iterator.hasNext()) {
+              IndexedRecord record = iterator.next();
+              writer.write(record.get(0).toString(), new 
HoodieAvroIndexedRecord(record), HoodieLSMTimelineInstant.getClassSchema());
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new HoodieCommitException("Failed to merge small archive files", 
e);
+    }
+    LOG.info("Success to merge small archive files.");
+  }
+
+  /**
+   * Checks whether there is any unfinished compaction operation.
+   *
+   * @param context HoodieEngineContext used for parallelize to delete small 
archive files if necessary.
+   */
+  public void clean(HoodieEngineContext context, int compactedVersions) throws 
IOException {
+    // if there are more than 3 version of snapshots, clean the oldest files.
+    List<Integer> allSnapshotVersions = 
LSMTimeline.allSnapshotVersions(metaClient);
+    int numVersionsToKeep = 3 + compactedVersions; // should make the 
threshold configurable.
+    if (allSnapshotVersions.size() > numVersionsToKeep) {
+      allSnapshotVersions.sort((v1, v2) -> v2 - v1);
+      List<Integer> versionsToKeep = allSnapshotVersions.subList(0, 
numVersionsToKeep);
+      Set<String> filesToKeep = versionsToKeep.stream()
+          .flatMap(version -> LSMTimeline.latestSnapshotManifest(metaClient, 
version).getFileNames().stream())
+          .collect(Collectors.toSet());
+      // delete the manifest file first
+      List<String> manifestFilesToClean = new ArrayList<>();
+      
Arrays.stream(LSMTimeline.listAllManifestFiles(metaClient)).forEach(fileStatus 
-> {
+        if 
(!versionsToKeep.contains(LSMTimeline.getManifestVersion(fileStatus.getPath().getName())))
 {
+          manifestFilesToClean.add(fileStatus.getPath().toString());
+        }
+      });
+      FSUtils.deleteFilesParallelize(metaClient, manifestFilesToClean, 
context, config.getArchiveDeleteParallelism(), false);
+      // delete the archive data files
+      List<String> dataFilesToClean = 
Arrays.stream(LSMTimeline.listAllMetaFiles(metaClient))
+          .filter(fileStatus -> 
!filesToKeep.contains(fileStatus.getPath().getName()))
+          .map(fileStatus -> fileStatus.getPath().toString())
+          .collect(Collectors.toList());
+      FSUtils.deleteFilesParallelize(metaClient, dataFilesToClean, context, 
config.getArchiveDeleteParallelism(), false);
+    }
+  }
+
+  private HoodieLSMTimelineManifest.LSMFileEntry getFileEntry(String fileName) 
throws IOException {
+    long fileLen = metaClient.getFs().getFileStatus(new 
Path(metaClient.getArchivePath(), fileName)).getLen();
+    return HoodieLSMTimelineManifest.LSMFileEntry.getInstance(fileName, 
fileLen);
+  }
+
+  /**
+   * Returns at most {@code filesBatch} number of source files
+   * restricted by the gross file size by 1GB.
+   */
+  private List<String> 
getCandidateFiles(List<HoodieLSMTimelineManifest.LSMFileEntry> files, int 
filesBatch) throws IOException {
+    List<String> candidates = new ArrayList<>();
+    long totalFileLen = 0L;
+    for (int i = 0; i < filesBatch; i++) {
+      HoodieLSMTimelineManifest.LSMFileEntry fileEntry = files.get(i);
+      if (totalFileLen > MAX_FILE_SIZE_IN_BYTES) {
+        return candidates;
+      }
+      // we may also need to consider a single file that is very close to the 
threshold in size,
+      // to avoid the write amplification,
+      // for e.g, two 800MB files compact into a 1.6GB file.
+      totalFileLen += fileEntry.getFileLen();
+      candidates.add(fileEntry.getFileName());
+    }
+    return candidates;
+  }
+
+  /**
+   * Returns a new file name.
+   */
+  private static String newFileName(String minInstant, String maxInstant, int 
layer) {
+    return minInstant + "_" + maxInstant + "_" + layer + 
HoodieFileFormat.PARQUET.getFileExtension();

Review Comment:
   String.format?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/LSMTimeline.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.common.model.HoodieLSMTimelineManifest;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ArchivedInstantReadSchemas;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * Represents the LSM Timeline for the Hoodie table.
+ *
+ * <p>After several instants are accumulated as a batch on the active 
timeline, they would be archived as a parquet file into the archived timeline.
+ * In general the archived timeline is comprised with parquet files with LSM 
style file layout. Each new operation to the archived timeline generates
+ * a new snapshot version. Theoretically, there could be multiple snapshot 
versions on the archived timeline.
+ *
+ * <p><h2>The Archived Timeline Layout</h2>
+ *
+ * <pre>
+ *   t111, t112 ... t120 ... ->
+ *     \              /
+ *        \        /
+ *            |
+ *            V
+ *   t111_t120_0.parquet, t101_t110_0.parquet,...  t11_t20_0.parquet    L0
+ *                                  \                    /
+ *                                     \              /
+ *                                            |
+ *                                            V
+ *                                    t11_t100_1.parquet                L1
+ *
+ *      manifest_1, manifest_2, ... manifest_12
+ *                                      |
+ *                                      V
+ *                                  _version_
+ * </pre>
+ *
+ * <p><h2>The LSM Tree Compaction</h2>
+ * Use the universal compaction strategy, that is: when N(by default 10) 
number of parquet files exist in the current layer, they are merged and flush 
as a compacted file in the next layer.
+ * We have no limit for the layer number, assumes there are 10 instants for 
each file in L0, there could be 100 instants per file in L1,
+ * so 3000 instants could be represented as 3 parquets in L2, it is pretty 
fast if we use concurrent read.
+ *
+ * <p>The benchmark shows 1000 instants read cost about 10 ms.
+ *
+ * <p><h2>The Archiver & Reader Snapshot Isolation</h2>
+ *
+ * <p>In order to make snapshot isolation of the archived timeline write/read, 
we add two kinds of metadata files for the LSM tree version management:
+ * <ol>
+ *   <li>Manifest file: Each new file in layer 0 or each compaction would 
generate a new manifest file, the manifest file records the valid file handles 
of the latest snapshot;</li>
+ *   <li>Version file: A version file is generated right after a complete 
manifest file is formed.</li>
+ * </ol>
+ *
+ * <p><h2>The Reader Workflow</h2>
+ * <ul>
+ *   <li>read the latest version;</li>
+ *   <li>read the manifest file for valid file handles;</li>
+ *   <li>probably do a data skipping with the parquet file name max min 
timestamp.</li>
+ * </ul>
+ *
+ * <p><h2>The Legacy Files Cleaning and Read Retention</h2>
+ * Only triggers file cleaning after a valid compaction.
+ *
+ * <p><h3>Clean Strategy</h3></p>
+ * Keeps only 3 valid snapshot versions for the reader, that means, a file is 
kept for at lest 3 archival trigger interval, for default configuration, it is 
30 instants time span,
+ * which is far longer that the archived timeline loading time.
+ *
+ * <p><h3>Instants TTL</h3></p>
+ * The timeline reader only reads instants of last limited days. We will by 
default skip the instants from archived timeline that are generated long time 
ago.
+ */
+public class LSMTimeline {
+  private static final Logger LOG = LoggerFactory.getLogger(LSMTimeline.class);
+
+  private static final String VERSION_FILE_NAME = "_version_";    // _version_
+  private static final String MANIFEST_FILE_PREFIX = "manifest_"; // 
manifest_[N]
+
+  private static final String TEMP_FILE_SUFFIX = ".tmp";
+
+  private static final Pattern ARCHIVE_FILE_PATTERN =
+      Pattern.compile("^(\\d+)_(\\d+)_(\\d)\\.parquet");
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+  public static Schema getReadSchema(HoodieArchivedTimeline.LoadMode loadMode) 
{
+    switch (loadMode) {
+      case SLIM:
+        return ArchivedInstantReadSchemas.TIMELINE_LSM_SLIM_READ_SCHEMA;
+      case METADATA:
+        return 
ArchivedInstantReadSchemas.TIMELINE_LSM_READ_SCHEMA_WITH_METADATA;
+      case PLAN:
+        return ArchivedInstantReadSchemas.TIMELINE_LSM_READ_SCHEMA_WITH_PLAN;
+      default:
+        throw new AssertionError("Unexpected");
+    }
+  }
+
+  /**
+   * Returns whether the given file is located in the filter.
+   */
+  public static boolean isFileInRange(HoodieArchivedTimeline.TimeRangeFilter 
filter, String fileName) {
+    String minInstant = getMinInstantTime(fileName);
+    String maxInstant = getMaxInstantTime(fileName);
+    return filter.isInRange(minInstant) || filter.isInRange(maxInstant);
+  }
+
+  /**
+   * Returns the latest snapshot version.
+   */
+  public static int latestSnapshotVersion(HoodieTableMetaClient metaClient) 
throws IOException {
+    Path versionFilePath = getVersionFilePath(metaClient);
+    if (metaClient.getFs().exists(versionFilePath)) {
+      try {
+        Option<byte[]> content = 
FileIOUtils.readDataFromPath(metaClient.getFs(), versionFilePath);
+        if (content.isPresent()) {
+          return Integer.parseInt(new String(content.get(), 
StandardCharsets.UTF_8));
+        }
+      } catch (Exception e) {
+        // fallback to manifest file listing.
+        LOG.warn("Error reading version file {}", versionFilePath, e);
+      }
+    }
+    return 
allSnapshotVersions(metaClient).stream().max(Integer::compareTo).orElse(-1);
+  }
+
+  /**
+   * Returns all the valid snapshot versions.
+   */
+  public static List<Integer> allSnapshotVersions(HoodieTableMetaClient 
metaClient) throws IOException {
+    return Arrays.stream(metaClient.getFs().listStatus(new 
Path(metaClient.getArchivePath()), getManifestFilePathFilter()))
+        .map(fileStatus -> fileStatus.getPath().getName())
+        .map(LSMTimeline::getManifestVersion)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Returns the latest snapshot metadata files.
+   */
+  public static HoodieLSMTimelineManifest 
latestSnapshotManifest(HoodieTableMetaClient metaClient) throws IOException {
+    int latestVersion = latestSnapshotVersion(metaClient);
+    return latestSnapshotManifest(metaClient, latestVersion);
+  }
+
+  /**
+   * Reads the file list from the manifest file for the latest snapshot.
+   */
+  public static HoodieLSMTimelineManifest 
latestSnapshotManifest(HoodieTableMetaClient metaClient, int latestVersion) {
+    if (latestVersion < 0) {
+      // there is no valid snapshot of the timeline.
+      return HoodieLSMTimelineManifest.EMPTY;
+    }
+    // read and deserialize the valid files.
+    byte[] content = FileIOUtils.readDataFromPath(metaClient.getFs(), 
getManifestFilePath(metaClient, latestVersion)).get();
+    try {
+      return HoodieLSMTimelineManifest.fromJsonString(new String(content, 
StandardCharsets.UTF_8), HoodieLSMTimelineManifest.class);
+    } catch (Exception e) {
+      throw new HoodieException("Error deserializing manifest entries", e);
+    }
+  }
+
+  /**
+   * Returns the full manifest file path with given version number.
+   */
+  public static Path getManifestFilePath(HoodieTableMetaClient metaClient, int 
snapshotVersion) {
+    return new Path(metaClient.getArchivePath(), MANIFEST_FILE_PREFIX + 
snapshotVersion);
+  }
+
+  /**
+   * Returns the full version file path with given version number.
+   */
+  public static Path getVersionFilePath(HoodieTableMetaClient metaClient) {
+    return new Path(metaClient.getArchivePath(), VERSION_FILE_NAME);
+  }
+
+  /**
+   * List all the parquet manifest files.
+   */
+  public static FileStatus[] listAllManifestFiles(HoodieTableMetaClient 
metaClient) throws IOException {
+    return metaClient.getFs().listStatus(new 
Path(metaClient.getArchivePath()), getManifestFilePathFilter());
+  }
+
+  /**
+   * List all the parquet metadata files.
+   */
+  public static FileStatus[] listAllMetaFiles(HoodieTableMetaClient 
metaClient) throws IOException {
+    return metaClient.getFs().globStatus(
+        new Path(metaClient.getArchivePath() + "/*.parquet"));
+  }
+
+  /**
+   * Parse the snapshot version from the manifest file name.
+   */
+  public static int getManifestVersion(String fileName) {
+    return Integer.parseInt(fileName.split("_")[1]);
+  }
+
+  /**
+   * Parse the layer number from the file name.
+   */
+  public static int getFileLayer(String fileName) {
+    try {
+      Matcher fileMatcher = ARCHIVE_FILE_PATTERN.matcher(fileName);
+      if (fileMatcher.matches()) {
+        return Integer.parseInt(fileMatcher.group(3));
+      }
+    } catch (NumberFormatException e) {
+      // log and ignore any format warnings
+      LOG.warn("error getting file layout for archived file: " + fileName);
+    }
+
+    // return default value in case of any errors
+    return 0;
+  }
+
+  /**
+   * Parse the minimum instant time from the file name.
+   */
+  public static String getMinInstantTime(String fileName) {

Review Comment:
   instead of individual parsing methods, can introduce a POJO here, with 
getters. i.e 
   
   LSMFile class with min, max, level as fields?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to