xushiyan commented on code in PR #5837:
URL: https://github.com/apache/hudi/pull/5837#discussion_r928191163


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java:
##########
@@ -92,6 +93,13 @@ public class HoodieArchivalConfig extends HoodieConfig {
       .withDocumentation("When enable, hoodie will auto merge several small 
archive files into larger one. It's"
           + " useful when storage scheme doesn't support append operation.");
 
+  public static final ConfigProperty<Boolean> ARCHIVE_BEYOND_SAVEPOINT = 
ConfigProperty
+      .key("hoodie.archive.proceed.savepoint")

Review Comment:
   not sure why you don't call it `hoodie.archive.beyond.savepoint`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##########
@@ -409,9 +412,11 @@ private Stream<HoodieInstant> getCommitInstantsToArchive() 
{
             
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, 
HoodieTimeline.DELTA_COMMIT_ACTION))
             .filterInflights().firstInstant();
 
-    // We cannot have any holes in the commit timeline. We cannot archive any 
commits which are
-    // made after the first savepoint present.
+    // NOTE: We cannot have any holes in the commit timeline.
+    // We cannot archive any commits which are made after the first savepoint 
present,
+    // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
     Option<HoodieInstant> firstSavepoint = 
table.getCompletedSavepointTimeline().firstInstant();
+    List<String> savepointTimestamps = table.getSavepointTimestamps();

Review Comment:
   this is mainly used for `contains()` check. so it should be a `Set`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##########
@@ -479,26 +489,48 @@ private Stream<HoodieInstant> getInstantsToArchive() {
           instants = Stream.empty();
         } else {
           LOG.info("Limiting archiving of instants to latest compaction on 
metadata table at " + latestCompactionTime.get());
-          instants = instants.filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.LESSER_THAN,
+          instants = instants.filter(instant -> 
compareTimestamps(instant.getTimestamp(), LESSER_THAN,
               latestCompactionTime.get()));
         }
       } catch (Exception e) {
         throw new HoodieException("Error limiting instant archival based on 
metadata table", e);
       }
     }
 
-    // If this is a metadata table, do not archive the commits that live in 
data set
-    // active timeline. This is required by metadata table,
-    // see HoodieTableMetadataUtil#processRollbackMetadata for details.
     if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
       HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
           
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
           .setConf(metaClient.getHadoopConf())
           .build();
-      Option<String> earliestActiveDatasetCommit = 
dataMetaClient.getActiveTimeline().firstInstant().map(HoodieInstant::getTimestamp);
-      if (earliestActiveDatasetCommit.isPresent()) {
-        instants = instants.filter(instant ->
-            HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.LESSER_THAN, earliestActiveDatasetCommit.get()));
+      Option<HoodieInstant> earliestActiveDatasetCommit = 
dataMetaClient.getActiveTimeline().firstInstant();
+
+      if (config.shouldArchiveBeyondSavepoint()) {
+        // There are chances that there could be holes in the timeline due to 
archival and savepoint interplay.
+        // So, the first non-savepoint commit in the data timeline is 
considered as beginning of the active timeline.
+        Set<String> savepointTimestamps = 
dataMetaClient.getActiveTimeline().getInstants()
+            .filter(entry -> 
entry.getAction().equals(HoodieTimeline.SAVEPOINT_ACTION))
+            .map(HoodieInstant::getTimestamp)
+            .collect(Collectors.toSet());
+        Option<HoodieInstant> firstNonSavepointCommit = 
earliestActiveDatasetCommit;
+
+        if (!savepointTimestamps.isEmpty()) {
+          firstNonSavepointCommit = 
Option.fromJavaOptional(dataMetaClient.getActiveTimeline().getInstants()
+              .filter(entry -> 
!savepointTimestamps.contains(entry.getTimestamp()))
+              .findFirst());
+        }

Review Comment:
   this logic to find first non savepoint commit reused in hoodie default 
timeline. can be extracted out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to