nsivabalan commented on code in PR #8837:
URL: https://github.com/apache/hudi/pull/8837#discussion_r1243081122


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -851,26 +919,49 @@ public void update(HoodieRestoreMetadata restoreMetadata, 
String instantTime) {
    */
   @Override
   public void update(HoodieRollbackMetadata rollbackMetadata, String 
instantTime) {
-    if (enabled && metadata != null) {
-      // Is this rollback of an instant that has been synced to the metadata 
table?
-      String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0);
-      boolean wasSynced = 
metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, rollbackInstant));
-      if (!wasSynced) {
-        // A compaction may have taken place on metadata table which would 
have included this instant being rolled back.
-        // Revisit this logic to relax the compaction fencing : 
https://issues.apache.org/jira/browse/HUDI-2458
-        Option<String> latestCompaction = metadata.getLatestCompactionTime();
-        if (latestCompaction.isPresent()) {
-          wasSynced = HoodieTimeline.compareTimestamps(rollbackInstant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCompaction.get());
-        }
+    // The commit which is being rolled back on the dataset
+    final String commitInstantTime = 
rollbackMetadata.getCommitsRollback().get(0);
+    // Find the deltacommits since the last compaction
+    Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+        
CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline());
+    if (!deltaCommitsInfo.isPresent()) {
+      LOG.info(String.format("Ignoring rollback of instant %s at %s since 
there are no deltacommits on MDT", commitInstantTime, instantTime));
+      return;
+    }
+
+    // This could be a compaction or deltacommit instant (See 
CompactionUtils.getDeltaCommitsSinceLatestCompaction)
+    HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue();
+    HoodieTimeline deltacommitsSinceCompaction = 
deltaCommitsInfo.get().getKey();
+
+    // The deltacommit that will be rolled back
+    HoodieInstant deltaCommitInstant = new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, commitInstantTime);
+
+    // The commit being rolled back should not be older than the latest 
compaction on the MDT. Compaction on MDT only occurs when all actions
+    // are completed on the dataset. Hence, this case implies a rollback of 
completed commit which should actually be handled using restore.
+    if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
+      final String compactionInstantTime = compactionInstant.getTimestamp();
+      if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitInstantTime, 
compactionInstantTime)) {
+        throw new HoodieMetadataException(String.format("Commit being rolled 
back %s is older than the latest compaction %s. "
+                + "There are %d deltacommits after this compaction: %s", 
commitInstantTime, compactionInstantTime,
+            deltacommitsSinceCompaction.countInstants(), 
deltacommitsSinceCompaction.getInstants()));
       }
+    }
 
-      Map<MetadataPartitionType, HoodieData<HoodieRecord>> records =
-          HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, 
metadataMetaClient.getActiveTimeline(),
-              rollbackMetadata, getRecordsGenerationParams(), instantTime,
-              metadata.getSyncedInstantTime(), wasSynced);
-      commit(instantTime, records, false);
-      closeInternal();
+    if (deltaCommitsInfo.get().getKey().containsInstant(deltaCommitInstant)) {
+      LOG.info("Rolling back MDT deltacommit " + commitInstantTime);
+      if (!getWriteClient().rollback(commitInstantTime, instantTime)) {
+        throw new HoodieMetadataException("Failed to rollback deltacommit at " 
+ commitInstantTime);
+      }
+    } else {
+      LOG.info(String.format("Ignoring rollback of instant %s at %s since 
there are no corresponding deltacommits on MDT",
+          commitInstantTime, instantTime));
     }
+
+    // Rollback of MOR table may end up adding a new log file. So we need to 
check for added files and add them to MDT
+    processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, 
metadataMetaClient.getActiveTimeline(),
+        rollbackMetadata, getRecordsGenerationParams(), instantTime,
+        metadata.getSyncedInstantTime(), true), false);

Review Comment:
   just wanted to double confirm. in the list of valid instants we populate 
while reading MDT using Log Record Reader, we do include rollback instants from 
DT right? How this might pan out, if a async compaction from DT is rolled back 
multiple times and then finally it gets committed? 
   
   ```
   public static Set<String> getValidInstantTimestamps(HoodieTableMetaClient 
dataMetaClient,
                                                         HoodieTableMetaClient 
metadataMetaClient) {
       // Only those log files which have a corresponding completed instant on 
the dataset should be read
       // This is because the metadata table is updated before the dataset 
instants are committed.
       HoodieActiveTimeline datasetTimeline = 
dataMetaClient.getActiveTimeline();
       Set<String> validInstantTimestamps = 
datasetTimeline.filterCompletedInstants().getInstantsAsStream()
           .map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
   
       // We should also add completed indexing delta commits in the metadata 
table, as they do not
       // have corresponding completed instant in the data table
       validInstantTimestamps.addAll(
           metadataMetaClient.getActiveTimeline()
               .filter(instant -> instant.isCompleted()
                   && (isIndexingCommit(instant.getTimestamp()) || 
isLogCompactionInstant(instant)))
               .getInstantsAsStream()
               .map(HoodieInstant::getTimestamp)
               .collect(Collectors.toList()));
   
       // For any rollbacks and restores, we cannot neglect the instants that 
they are rolling back.
       // The rollback instant should be more recent than the start of the 
timeline for it to have rolled back any
       // instant which we have a log block for.
       final String earliestInstantTime = validInstantTimestamps.isEmpty() ? 
SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
       
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstantsAsStream()
           .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN, earliestInstantTime))
           .forEach(instant -> {
             validInstantTimestamps.addAll(getRollbackedCommits(instant, 
datasetTimeline));
           });
   
       // SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid 
timestamp
       
validInstantTimestamps.add(createIndexInitTimestamp(SOLO_COMMIT_TIMESTAMP, 
PARTITION_INITIALIZATION_TIME_SUFFIX));
       return validInstantTimestamps;
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to