nsivabalan commented on code in PR #8837:
URL: https://github.com/apache/hudi/pull/8837#discussion_r1212500637


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -837,10 +840,75 @@ public void update(HoodieCleanMetadata cleanMetadata, 
String instantTime) {
    */
   @Override
   public void update(HoodieRestoreMetadata restoreMetadata, String 
instantTime) {
-    processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
-        metadataMetaClient.getActiveTimeline(), restoreMetadata, 
getRecordsGenerationParams(), instantTime,
-        metadata.getSyncedInstantTime()), false);
-    closeInternal();
+    dataMetaClient.reloadActiveTimeline();
+
+    // Since the restore has completed on the dataset, the latest write 
timeline instant is the one to which the
+    // restore was performed. This should be always present.
+    final String restoreToInstantTime = 
dataMetaClient.getActiveTimeline().getWriteTimeline()
+        .getReverseOrderedInstants().findFirst().get().getTimestamp();
+
+    // We cannot restore to before the oldest compaction on MDT as we don't 
have the basefiles before that time.
+    Option<HoodieInstant> lastCompaction = 
metadataMetaClient.getCommitTimeline().filterCompletedInstants().lastInstant();

Review Comment:
   shouldn't this logic go into restore method in BaseHoodieWriteClient where 
we trigger the restore for data table. so, this is double/repeated validation 
right? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -669,32 +669,51 @@ public void restoreToSavepoint() {
    * @param savepointTime Savepoint time to rollback to
    */
   public void restoreToSavepoint(String savepointTime) {
-    boolean initialMetadataTableIfNecessary = config.isMetadataTableEnabled();
-    if (initialMetadataTableIfNecessary) {
+    boolean initializeMetadataTableIfNecessary = 
config.isMetadataTableEnabled();
+    if (initializeMetadataTableIfNecessary) {
       try {
-        // Delete metadata table directly when users trigger savepoint 
rollback if mdt existed and beforeTimelineStarts
+        // Delete metadata table directly when users trigger savepoint 
rollback if mdt existed and if the savePointTime is beforeTimelineStarts
+        // or before the oldest compaction on MDT.
+        // We cannot restore to before the oldest compaction on MDT as we 
don't have the basefiles before that time.
         String metadataTableBasePathStr = 
HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath());
         HoodieTableMetaClient mdtClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePathStr).build();
-        // Same as HoodieTableMetadataUtil#processRollbackMetadata
+        Option<HoodieInstant> lastCompaction = 
mdtClient.getCommitTimeline().filterCompletedInstants().lastInstant();

Review Comment:
   rename to "latestMdtCompaction"



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -837,10 +840,75 @@ public void update(HoodieCleanMetadata cleanMetadata, 
String instantTime) {
    */
   @Override
   public void update(HoodieRestoreMetadata restoreMetadata, String 
instantTime) {
-    processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
-        metadataMetaClient.getActiveTimeline(), restoreMetadata, 
getRecordsGenerationParams(), instantTime,
-        metadata.getSyncedInstantTime()), false);
-    closeInternal();
+    dataMetaClient.reloadActiveTimeline();
+
+    // Since the restore has completed on the dataset, the latest write 
timeline instant is the one to which the
+    // restore was performed. This should be always present.
+    final String restoreToInstantTime = 
dataMetaClient.getActiveTimeline().getWriteTimeline()
+        .getReverseOrderedInstants().findFirst().get().getTimestamp();
+
+    // We cannot restore to before the oldest compaction on MDT as we don't 
have the basefiles before that time.
+    Option<HoodieInstant> lastCompaction = 
metadataMetaClient.getCommitTimeline().filterCompletedInstants().lastInstant();
+    if (lastCompaction.isPresent()) {
+      if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(restoreToInstantTime, 
lastCompaction.get().getTimestamp())) {
+        String msg = String.format("Cannot restore MDT to %s because it is 
older than latest compaction at %s", restoreToInstantTime,
+            lastCompaction.get().getTimestamp()) + ". Please delete MDT and 
restore again";
+        LOG.error(msg);
+        throw new HoodieMetadataException(msg);
+      }
+    }
+
+    // Restore requires the existing pipelines to be shutdown. So we can 
safely scan the dataset to find the current
+    // list of files in the filesystem.
+    List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);
+    Map<String, DirectoryInfo> dirInfoMap = 
dirInfoList.stream().collect(Collectors.toMap(DirectoryInfo::getRelativePath, 
Function.identity()));
+    dirInfoList.clear();
+
+    LOG.info("Restoring MDT to " + restoreToInstantTime + " at " + 
instantTime);
+    getWriteClient().restoreToInstant(restoreToInstantTime, false);
+
+    // At this point we have also reverted the cleans which have occurred 
after the restoreToInstantTime. Hence, a sync

Review Comment:
   do we have tests covering this scenario? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -851,26 +919,49 @@ public void update(HoodieRestoreMetadata restoreMetadata, 
String instantTime) {
    */
   @Override
   public void update(HoodieRollbackMetadata rollbackMetadata, String 
instantTime) {
-    if (enabled && metadata != null) {
-      // Is this rollback of an instant that has been synced to the metadata 
table?
-      String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0);
-      boolean wasSynced = 
metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, rollbackInstant));
-      if (!wasSynced) {
-        // A compaction may have taken place on metadata table which would 
have included this instant being rolled back.
-        // Revisit this logic to relax the compaction fencing : 
https://issues.apache.org/jira/browse/HUDI-2458
-        Option<String> latestCompaction = metadata.getLatestCompactionTime();
-        if (latestCompaction.isPresent()) {
-          wasSynced = HoodieTimeline.compareTimestamps(rollbackInstant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCompaction.get());
-        }
+    // The commit which is being rolled back on the dataset
+    final String commitInstantTime = 
rollbackMetadata.getCommitsRollback().get(0);
+    // Find the deltacommits since the last compaction
+    Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+        
CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline());
+    if (!deltaCommitsInfo.isPresent()) {

Review Comment:
   do we have tests for all these scenarios ?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -851,26 +919,49 @@ public void update(HoodieRestoreMetadata restoreMetadata, 
String instantTime) {
    */
   @Override
   public void update(HoodieRollbackMetadata rollbackMetadata, String 
instantTime) {
-    if (enabled && metadata != null) {
-      // Is this rollback of an instant that has been synced to the metadata 
table?
-      String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0);
-      boolean wasSynced = 
metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, rollbackInstant));
-      if (!wasSynced) {
-        // A compaction may have taken place on metadata table which would 
have included this instant being rolled back.
-        // Revisit this logic to relax the compaction fencing : 
https://issues.apache.org/jira/browse/HUDI-2458
-        Option<String> latestCompaction = metadata.getLatestCompactionTime();
-        if (latestCompaction.isPresent()) {
-          wasSynced = HoodieTimeline.compareTimestamps(rollbackInstant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCompaction.get());
-        }
+    // The commit which is being rolled back on the dataset
+    final String commitInstantTime = 
rollbackMetadata.getCommitsRollback().get(0);
+    // Find the deltacommits since the last compaction
+    Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+        
CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline());
+    if (!deltaCommitsInfo.isPresent()) {
+      LOG.info(String.format("Ignoring rollback of instant %s at %s since 
there are no deltacommits on MDT", commitInstantTime, instantTime));
+      return;
+    }
+
+    // This could be a compaction or deltacommit instant (See 
CompactionUtils.getDeltaCommitsSinceLatestCompaction)
+    HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue();
+    HoodieTimeline deltacommitsSinceCompaction = 
deltaCommitsInfo.get().getKey();
+
+    // The deltacommit that will be rolled back
+    HoodieInstant deltaCommitInstant = new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, commitInstantTime);
+
+    // The commit being rolled back should not be older than the latest 
compaction on the MDT. Compaction on MDT only occurs when all actions
+    // are completed on the dataset. Hence, this case implies a rollback of 
completed commit which should actually be handled using restore.
+    if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {

Review Comment:
   why is this check required? can you help me understand ?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -837,10 +840,75 @@ public void update(HoodieCleanMetadata cleanMetadata, 
String instantTime) {
    */
   @Override
   public void update(HoodieRestoreMetadata restoreMetadata, String 
instantTime) {
-    processAndCommit(instantTime, () -> 
HoodieTableMetadataUtil.convertMetadataToRecords(engineContext,
-        metadataMetaClient.getActiveTimeline(), restoreMetadata, 
getRecordsGenerationParams(), instantTime,
-        metadata.getSyncedInstantTime()), false);
-    closeInternal();
+    dataMetaClient.reloadActiveTimeline();
+
+    // Since the restore has completed on the dataset, the latest write 
timeline instant is the one to which the
+    // restore was performed. This should be always present.
+    final String restoreToInstantTime = 
dataMetaClient.getActiveTimeline().getWriteTimeline()
+        .getReverseOrderedInstants().findFirst().get().getTimestamp();
+
+    // We cannot restore to before the oldest compaction on MDT as we don't 
have the basefiles before that time.
+    Option<HoodieInstant> lastCompaction = 
metadataMetaClient.getCommitTimeline().filterCompletedInstants().lastInstant();
+    if (lastCompaction.isPresent()) {
+      if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(restoreToInstantTime, 
lastCompaction.get().getTimestamp())) {
+        String msg = String.format("Cannot restore MDT to %s because it is 
older than latest compaction at %s", restoreToInstantTime,
+            lastCompaction.get().getTimestamp()) + ". Please delete MDT and 
restore again";
+        LOG.error(msg);
+        throw new HoodieMetadataException(msg);
+      }
+    }
+
+    // Restore requires the existing pipelines to be shutdown. So we can 
safely scan the dataset to find the current
+    // list of files in the filesystem.
+    List<DirectoryInfo> dirInfoList = listAllPartitions(dataMetaClient);
+    Map<String, DirectoryInfo> dirInfoMap = 
dirInfoList.stream().collect(Collectors.toMap(DirectoryInfo::getRelativePath, 
Function.identity()));
+    dirInfoList.clear();
+
+    LOG.info("Restoring MDT to " + restoreToInstantTime + " at " + 
instantTime);
+    getWriteClient().restoreToInstant(restoreToInstantTime, false);
+
+    // At this point we have also reverted the cleans which have occurred 
after the restoreToInstantTime. Hence, a sync
+    // is required to bring back those cleans.
+    try {
+      initTableMetadata();
+      HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata();
+      Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new 
HashMap<>();
+      for (String partition : metadata.fetchAllPartitionPaths()) {
+        FileStatus[] metadataFiles = metadata.getAllFilesInPartition(new 
Path(dataWriteConfig.getBasePath(), partition));
+        if (!dirInfoMap.containsKey(partition)) {
+          // Entire partition has been deleted
+          List<String> filePaths = Arrays.stream(metadataFiles).map(f -> 
f.getPath().getName()).collect(Collectors.toList());
+          HoodieCleanPartitionMetadata cleanPartitionMetadata = new 
HoodieCleanPartitionMetadata(partition, "", filePaths, filePaths,
+              Collections.emptyList(), true);
+          partitionMetadata.put(partition, cleanPartitionMetadata);
+        } else {
+          // Some files cleaned in the partition
+          Map<String, Long> fsFiles = 
dirInfoMap.get(partition).getFileNameToSizeMap();
+          List<String> filesDeleted = Arrays.stream(metadataFiles).map(f -> 
f.getPath().getName())
+              .filter(n -> 
!fsFiles.containsKey(n)).collect(Collectors.toList());
+          if (!filesDeleted.isEmpty()) {
+            LOG.info("Found deleted files in partition " + partition + ": " + 
filesDeleted);
+            HoodieCleanPartitionMetadata cleanPartitionMetadata = new 
HoodieCleanPartitionMetadata(partition, "", filesDeleted, filesDeleted,
+                Collections.EMPTY_LIST, false);
+            partitionMetadata.put(partition, cleanPartitionMetadata);
+          }
+        }
+      }
+      cleanMetadata.setPartitionMetadata(partitionMetadata);
+
+      // Even if we don't have any deleted files to sync, we still create an 
empty commit so that we can track the restore has completed.
+      // We cannot create a deltaCommit at instantTime now because a future 
block (rollback) has already been written to the logFiles.
+      // We need to choose a timestamp which would be a validInstantTime for 
MDT. This is either a commit timestamp completed on the dataset
+      // or a timestamp with suffix which we use for MDT clean, compaction etc.
+      // String syncCommitTime = 
HoodieTableMetadataUtil.createIndexInitTimestamp(HoodieActiveTimeline.createNewInstantTime());
+      // TODO: Using METADATA_INDEXER_TIME_SUFFIX for now, but this should 
have its own suffix. To be fixed after HUDI-6200
+      String syncCommitTime = HoodieActiveTimeline.createNewInstantTime() + 
METADATA_INDEXER_TIME_SUFFIX;

Review Comment:
   @codope : is there any dependencies we have around the indexing commit 
suffix? do you see any issues here. or can u confirm if we are good



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -851,26 +919,49 @@ public void update(HoodieRestoreMetadata restoreMetadata, 
String instantTime) {
    */
   @Override
   public void update(HoodieRollbackMetadata rollbackMetadata, String 
instantTime) {
-    if (enabled && metadata != null) {
-      // Is this rollback of an instant that has been synced to the metadata 
table?
-      String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0);
-      boolean wasSynced = 
metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, rollbackInstant));
-      if (!wasSynced) {
-        // A compaction may have taken place on metadata table which would 
have included this instant being rolled back.
-        // Revisit this logic to relax the compaction fencing : 
https://issues.apache.org/jira/browse/HUDI-2458
-        Option<String> latestCompaction = metadata.getLatestCompactionTime();
-        if (latestCompaction.isPresent()) {
-          wasSynced = HoodieTimeline.compareTimestamps(rollbackInstant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCompaction.get());
-        }
+    // The commit which is being rolled back on the dataset
+    final String commitInstantTime = 
rollbackMetadata.getCommitsRollback().get(0);
+    // Find the deltacommits since the last compaction
+    Option<Pair<HoodieTimeline, HoodieInstant>> deltaCommitsInfo =
+        
CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline());
+    if (!deltaCommitsInfo.isPresent()) {
+      LOG.info(String.format("Ignoring rollback of instant %s at %s since 
there are no deltacommits on MDT", commitInstantTime, instantTime));
+      return;
+    }
+
+    // This could be a compaction or deltacommit instant (See 
CompactionUtils.getDeltaCommitsSinceLatestCompaction)
+    HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue();
+    HoodieTimeline deltacommitsSinceCompaction = 
deltaCommitsInfo.get().getKey();
+
+    // The deltacommit that will be rolled back
+    HoodieInstant deltaCommitInstant = new HoodieInstant(false, 
HoodieTimeline.DELTA_COMMIT_ACTION, commitInstantTime);
+
+    // The commit being rolled back should not be older than the latest 
compaction on the MDT. Compaction on MDT only occurs when all actions
+    // are completed on the dataset. Hence, this case implies a rollback of 
completed commit which should actually be handled using restore.
+    if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) {
+      final String compactionInstantTime = compactionInstant.getTimestamp();
+      if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitInstantTime, 
compactionInstantTime)) {
+        throw new HoodieMetadataException(String.format("Commit being rolled 
back %s is older than the latest compaction %s. "
+                + "There are %d deltacommits after this compaction: %s", 
commitInstantTime, compactionInstantTime,
+            deltacommitsSinceCompaction.countInstants(), 
deltacommitsSinceCompaction.getInstants()));
       }
+    }
 
-      Map<MetadataPartitionType, HoodieData<HoodieRecord>> records =
-          HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, 
metadataMetaClient.getActiveTimeline(),
-              rollbackMetadata, getRecordsGenerationParams(), instantTime,
-              metadata.getSyncedInstantTime(), wasSynced);
-      commit(instantTime, records, false);
-      closeInternal();
+    if (deltaCommitsInfo.get().getKey().containsInstant(deltaCommitInstant)) {
+      LOG.info("Rolling back MDT deltacommit " + commitInstantTime);
+      if (!getWriteClient().rollback(commitInstantTime, instantTime)) {
+        throw new HoodieMetadataException("Failed to rollback deltacommit at " 
+ commitInstantTime);
+      }
+    } else {
+      LOG.info(String.format("Ignoring rollback of instant %s at %s since 
there are no corresponding deltacommits on MDT",
+          commitInstantTime, instantTime));
     }
+
+    // Rollback of MOR table may end up adding a new log file. So we need to 
check for added files and add them to MDT

Review Comment:
   NTS(note to self). need to review this closer. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to