[GitHub] [hudi] nsivabalan commented on a diff in pull request #8837: [HUDI-6153] Changed the rollback mechanism for MDT to actual rollbacks rather than appending revert blocks.
nsivabalan commented on code in PR #8837: URL: https://github.com/apache/hudi/pull/8837#discussion_r1268098329 ## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ## @@ -742,8 +750,10 @@ private static List convertFilesToFilesPartitionRecords(Map { final String partition = getPartitionIdentifier(partitionName); fileChangeCount[1] += appendedFileMap.size(); + Map appendedFiles = new HashMap<>(); + appendedFileMap.forEach((k,v) -> appendedFiles.put(k, 1L)); Review Comment: this code has been refactored and the feedback is not valid anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a diff in pull request #8837: [HUDI-6153] Changed the rollback mechanism for MDT to actual rollbacks rather than appending revert blocks.
nsivabalan commented on code in PR #8837: URL: https://github.com/apache/hudi/pull/8837#discussion_r1268098329 ## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ## @@ -742,8 +750,10 @@ private static List convertFilesToFilesPartitionRecords(Map { final String partition = getPartitionIdentifier(partitionName); fileChangeCount[1] += appendedFileMap.size(); + Map appendedFiles = new HashMap<>(); + appendedFileMap.forEach((k,v) -> appendedFiles.put(k, 1L)); Review Comment: this code has been refactored and is not valid anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a diff in pull request #8837: [HUDI-6153] Changed the rollback mechanism for MDT to actual rollbacks rather than appending revert blocks.
nsivabalan commented on code in PR #8837: URL: https://github.com/apache/hudi/pull/8837#discussion_r1268077898 ## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ## @@ -653,37 +608,57 @@ private static void processRestoreMetadata(HoodieActiveTimeline metadataTableTim */ public static Map> convertMetadataToRecords( HoodieEngineContext engineContext, HoodieActiveTimeline metadataTableTimeline, - HoodieRollbackMetadata rollbackMetadata, MetadataRecordsGenerationParams recordsGenerationParams, - String instantTime, Option lastSyncTs, boolean wasSynced) { + HoodieTableMetaClient dataTableMetaClient, + HoodieRollbackMetadata rollbackMetadata, String instantTime) { final Map> partitionToRecordsMap = new HashMap<>(); -Map> partitionToDeletedFiles = new HashMap<>(); -Map> partitionToAppendedFiles = new HashMap<>(); -List filesPartitionRecords = -convertMetadataToRollbackRecords(metadataTableTimeline, rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, instantTime, lastSyncTs, wasSynced); +List filesPartitionRecords = convertMetadataToRollbackRecords(rollbackMetadata, instantTime); + +List reAddedRecords = getHoodieRecordsForLogFilesFromRollbackPlan(dataTableMetaClient, instantTime); +filesPartitionRecords.addAll(reAddedRecords); final HoodieData rollbackRecordsRDD = engineContext.parallelize(filesPartitionRecords, 1); + partitionToRecordsMap.put(MetadataPartitionType.FILES, rollbackRecordsRDD); return partitionToRecordsMap; } + private static List getHoodieRecordsForLogFilesFromRollbackPlan(HoodieTableMetaClient dataTableMetaClient, String instantTime) { +/*List instants = dataTableMetaClient.reloadActiveTimeline().filterRequestedRollbackTimeline() +.filter(instant -> instant.getTimestamp().equals(instantTime) && instant.isRequested()).getInstants();*/ + +HoodieInstant rollbackInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, instantTime); + +// HoodieInstant rollbackInstant = instants.get(0); +HoodieInstant requested = HoodieTimeline.getRollbackRequestedInstant(rollbackInstant); +try { + HoodieRollbackPlan rollbackPlan = TimelineMetadataUtils.deserializeAvroMetadata( + dataTableMetaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(), HoodieRollbackPlan.class); + + Map> partitionToLogFilesMap = new HashMap<>(); + + rollbackPlan.getRollbackRequests().forEach(rollbackRequest -> { + partitionToLogFilesMap.computeIfAbsent(rollbackRequest.getPartitionPath(), s -> new HashMap<>()); +// fetch only log files that are expected to be RB'd in DT as part of this rollback. these log files will not be deleted, but rendered +// invalid once rollback is complete. + partitionToLogFilesMap.get(rollbackRequest.getPartitionPath()).putAll(rollbackRequest.getLogBlocksToBeDeleted()); Review Comment: there are chance we will have more than 1 entry for a given partition in rollbackPlan.getRollbackRequests(). I guess, the optimization you are suggesting based on the assumption that we will call put to partitionToFilesMap only once per partition. So, I don't see a real benefit here. please do let me know wdyt. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a diff in pull request #8837: [HUDI-6153] Changed the rollback mechanism for MDT to actual rollbacks rather than appending revert blocks.
nsivabalan commented on code in PR #8837: URL: https://github.com/apache/hudi/pull/8837#discussion_r1254985856 ## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java: ## @@ -653,37 +608,57 @@ private static void processRestoreMetadata(HoodieActiveTimeline metadataTableTim */ public static Map> convertMetadataToRecords( HoodieEngineContext engineContext, HoodieActiveTimeline metadataTableTimeline, - HoodieRollbackMetadata rollbackMetadata, MetadataRecordsGenerationParams recordsGenerationParams, - String instantTime, Option lastSyncTs, boolean wasSynced) { + HoodieTableMetaClient dataTableMetaClient, + HoodieRollbackMetadata rollbackMetadata, String instantTime) { final Map> partitionToRecordsMap = new HashMap<>(); -Map> partitionToDeletedFiles = new HashMap<>(); -Map> partitionToAppendedFiles = new HashMap<>(); -List filesPartitionRecords = -convertMetadataToRollbackRecords(metadataTableTimeline, rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, instantTime, lastSyncTs, wasSynced); +List filesPartitionRecords = convertMetadataToRollbackRecords(rollbackMetadata, instantTime); + +List reAddedRecords = getHoodieRecordsForLogFilesFromRollbackPlan(dataTableMetaClient, instantTime); +filesPartitionRecords.addAll(reAddedRecords); final HoodieData rollbackRecordsRDD = engineContext.parallelize(filesPartitionRecords, 1); + partitionToRecordsMap.put(MetadataPartitionType.FILES, rollbackRecordsRDD); return partitionToRecordsMap; } + private static List getHoodieRecordsForLogFilesFromRollbackPlan(HoodieTableMetaClient dataTableMetaClient, String instantTime) { +/*List instants = dataTableMetaClient.reloadActiveTimeline().filterRequestedRollbackTimeline() +.filter(instant -> instant.getTimestamp().equals(instantTime) && instant.isRequested()).getInstants();*/ + +HoodieInstant rollbackInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, instantTime); + +// HoodieInstant rollbackInstant = instants.get(0); +HoodieInstant requested = HoodieTimeline.getRollbackRequestedInstant(rollbackInstant); +try { + HoodieRollbackPlan rollbackPlan = TimelineMetadataUtils.deserializeAvroMetadata( + dataTableMetaClient.getActiveTimeline().readRollbackInfoAsBytes(requested).get(), HoodieRollbackPlan.class); + + Map> partitionToLogFilesMap = new HashMap<>(); + + rollbackPlan.getRollbackRequests().forEach(rollbackRequest -> { + partitionToLogFilesMap.computeIfAbsent(rollbackRequest.getPartitionPath(), s -> new HashMap<>()); +// fetch only log files that are expected to be RB'd in DT as part of this rollback. these log files will not be deleted, but rendered +// invalid once rollback is complete. + partitionToLogFilesMap.get(rollbackRequest.getPartitionPath()).putAll(rollbackRequest.getLogBlocksToBeDeleted()); + }); Review Comment: it refers to full log file name. Sample rollback.requested ``` avrocat /tmp/hudi_trips_mor/.hoodie/20230705160035431.rollback.requested {"instantToRollback": {"HoodieInstantInfo": {"commitTime": "20230705160008665", "action": "deltacommit"}}, "RollbackRequests": {"array": [{"partitionPath": "americas/brazil/sao_paulo", "fileId": {"string": "0c717479-0609-4dd5-8ea4-f2edcf17986c-0"}, "latestBaseInstant": {"string": "20230705155904980"}, "filesToBeDeleted": [], "logBlocksToBeDeleted": {"map": {"file:/tmp/hudi_trips_mor/americas/brazil/sao_paulo/.0c717479-0609-4dd5-8ea4-f2edcf17986c-0_20230705155904980.log.2_0-81-113": -1}}}, {"partitionPath": "asia/india/chennai", "fileId": {"string": "27df1310-75f0-4353-9834-b0736fd3f6f3-0"}, "latestBaseInstant": {"string": "20230705155904980"}, "filesToBeDeleted": [], "logBlocksToBeDeleted": {"map": {"file:/tmp/hudi_trips_mor/asia/india/chennai/.27df1310-75f0-4353-9834-b0736fd3f6f3-0_20230705155904980.log.2_2-81-115": -1}}}, {"partitionPath": "americas/united_states/san_francisco", "fileId": {"string": "ff8f1c4b-c2e8-4f7d-b7ad-7ecb385a5f1d-0"}, "latestBaseInstant": {"string": "2023 0705155904980"}, "filesToBeDeleted": [], "logBlocksToBeDeleted": {"map": {"file:/tmp/hudi_trips_mor/americas/united_states/san_francisco/.ff8f1c4b-c2e8-4f7d-b7ad-7ecb385a5f1d-0_20230705155904980.log.2_1-81-114": -1}}}]}, "version": {"int": 1}} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a diff in pull request #8837: [HUDI-6153] Changed the rollback mechanism for MDT to actual rollbacks rather than appending revert blocks.
nsivabalan commented on code in PR #8837: URL: https://github.com/apache/hudi/pull/8837#discussion_r1254985620 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -991,10 +992,6 @@ public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) // Find the deltacommits since the last compaction Option> deltaCommitsInfo = CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline()); - if (!deltaCommitsInfo.isPresent() || deltaCommitsInfo.get().getKey().empty()) { Review Comment: here irrespective of whether the commit to rollback is already present in MDT or not, we might have to go ahead and add the delta commit. thats why I had to fix this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a diff in pull request #8837: [HUDI-6153] Changed the rollback mechanism for MDT to actual rollbacks rather than appending revert blocks.
nsivabalan commented on code in PR #8837: URL: https://github.com/apache/hudi/pull/8837#discussion_r1248215039 ## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java: ## @@ -1871,7 +1865,11 @@ private void testTableOperationsImpl(HoodieSparkEngineContext engineContext, Hoo validateMetadata(client); // Restore - client.restoreToInstant("2021010100060", writeConfig.isMetadataTableEnabled()); + if (metaClient.getTableType() == COPY_ON_WRITE) { +assertThrows(HoodieRestoreException.class, () -> client.restoreToInstant("2021010100060", writeConfig.isMetadataTableEnabled())); Review Comment: @prashantwason : hey, can you help clarify, why do we expect this to fail just for COW table and not MOR ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a diff in pull request #8837: [HUDI-6153] Changed the rollback mechanism for MDT to actual rollbacks rather than appending revert blocks.
nsivabalan commented on code in PR #8837: URL: https://github.com/apache/hudi/pull/8837#discussion_r1248170142 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -973,52 +973,46 @@ public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { */ @Override public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) { -// The commit which is being rolled back on the dataset -final String commitInstantTime = rollbackMetadata.getCommitsRollback().get(0); -// Find the deltacommits since the last compaction -Option> deltaCommitsInfo = - CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline()); -if (!deltaCommitsInfo.isPresent()) { - LOG.info(String.format("Ignoring rollback of instant %s at %s since there are no deltacommits on MDT", commitInstantTime, instantTime)); - return; -} - -// This could be a compaction or deltacommit instant (See CompactionUtils.getDeltaCommitsSinceLatestCompaction) -HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue(); -HoodieTimeline deltacommitsSinceCompaction = deltaCommitsInfo.get().getKey(); - -// The deltacommit that will be rolled back -HoodieInstant deltaCommitInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, commitInstantTime); - -// The commit being rolled back should not be older than the latest compaction on the MDT. Compaction on MDT only occurs when all actions -// are completed on the dataset. Hence, this case implies a rollback of completed commit which should actually be handled using restore. -if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) { - final String compactionInstantTime = compactionInstant.getTimestamp(); - if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitInstantTime, compactionInstantTime)) { -throw new HoodieMetadataException(String.format("Commit being rolled back %s is older than the latest compaction %s. " -+ "There are %d deltacommits after this compaction: %s", commitInstantTime, compactionInstantTime, -deltacommitsSinceCompaction.countInstants(), deltacommitsSinceCompaction.getInstants())); +if (initialized && metadata != null) { + // The commit which is being rolled back on the dataset + final String commitInstantTime = rollbackMetadata.getCommitsRollback().get(0); + // Find the deltacommits since the last compaction + Option> deltaCommitsInfo = + CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline()); + if (!deltaCommitsInfo.isPresent() || deltaCommitsInfo.get().getKey().empty()) { +LOG.info(String.format("Ignoring rollback of instant %s at %s since there are no deltacommits on MDT", commitInstantTime, instantTime)); +return; } -} -if (deltacommitsSinceCompaction.containsInstant(deltaCommitInstant)) { - LOG.info("Rolling back MDT deltacommit " + commitInstantTime); - if (!getWriteClient().rollback(commitInstantTime, instantTime)) { -throw new HoodieMetadataException("Failed to rollback deltacommit at " + commitInstantTime); + // This could be a compaction or deltacommit instant (See CompactionUtils.getDeltaCommitsSinceLatestCompaction) + HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue(); + HoodieTimeline deltacommitsSinceCompaction = deltaCommitsInfo.get().getKey(); + + // The deltacommit that will be rolled back + HoodieInstant deltaCommitInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, commitInstantTime); + + // The commit being rolled back should not be older than the latest compaction on the MDT. Compaction on MDT only occurs when all actions + // are completed on the dataset. Hence, this case implies a rollback of completed commit which should actually be handled using restore. + if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) { +final String compactionInstantTime = compactionInstant.getTimestamp(); +if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitInstantTime, compactionInstantTime)) { + throw new HoodieMetadataException(String.format("Commit being rolled back %s is older than the latest compaction %s. " + + "There are %d deltacommits after this compaction: %s", commitInstantTime, compactionInstantTime, + deltacommitsSinceCompaction.countInstants(), deltacommitsSinceCompaction.getInstants())); +} } -} else { - LOG.info(String.format("Ignoring rollback of instant %s at %s since there are no corresponding deltacommits on MDT", - commitInstantTime, instantTime)); -} -// Rollback of MOR table may end up adding a new log file. So we need to check
[GitHub] [hudi] nsivabalan commented on a diff in pull request #8837: [HUDI-6153] Changed the rollback mechanism for MDT to actual rollbacks rather than appending revert blocks.
nsivabalan commented on code in PR #8837: URL: https://github.com/apache/hudi/pull/8837#discussion_r1243081122 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -851,26 +919,49 @@ public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { */ @Override public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) { -if (enabled && metadata != null) { - // Is this rollback of an instant that has been synced to the metadata table? - String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0); - boolean wasSynced = metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, rollbackInstant)); - if (!wasSynced) { -// A compaction may have taken place on metadata table which would have included this instant being rolled back. -// Revisit this logic to relax the compaction fencing : https://issues.apache.org/jira/browse/HUDI-2458 -Option latestCompaction = metadata.getLatestCompactionTime(); -if (latestCompaction.isPresent()) { - wasSynced = HoodieTimeline.compareTimestamps(rollbackInstant, HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCompaction.get()); -} +// The commit which is being rolled back on the dataset +final String commitInstantTime = rollbackMetadata.getCommitsRollback().get(0); +// Find the deltacommits since the last compaction +Option> deltaCommitsInfo = + CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline()); +if (!deltaCommitsInfo.isPresent()) { + LOG.info(String.format("Ignoring rollback of instant %s at %s since there are no deltacommits on MDT", commitInstantTime, instantTime)); + return; +} + +// This could be a compaction or deltacommit instant (See CompactionUtils.getDeltaCommitsSinceLatestCompaction) +HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue(); +HoodieTimeline deltacommitsSinceCompaction = deltaCommitsInfo.get().getKey(); + +// The deltacommit that will be rolled back +HoodieInstant deltaCommitInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, commitInstantTime); + +// The commit being rolled back should not be older than the latest compaction on the MDT. Compaction on MDT only occurs when all actions +// are completed on the dataset. Hence, this case implies a rollback of completed commit which should actually be handled using restore. +if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) { + final String compactionInstantTime = compactionInstant.getTimestamp(); + if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitInstantTime, compactionInstantTime)) { +throw new HoodieMetadataException(String.format("Commit being rolled back %s is older than the latest compaction %s. " ++ "There are %d deltacommits after this compaction: %s", commitInstantTime, compactionInstantTime, +deltacommitsSinceCompaction.countInstants(), deltacommitsSinceCompaction.getInstants())); } +} - Map> records = - HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, metadataMetaClient.getActiveTimeline(), - rollbackMetadata, getRecordsGenerationParams(), instantTime, - metadata.getSyncedInstantTime(), wasSynced); - commit(instantTime, records, false); - closeInternal(); +if (deltaCommitsInfo.get().getKey().containsInstant(deltaCommitInstant)) { + LOG.info("Rolling back MDT deltacommit " + commitInstantTime); + if (!getWriteClient().rollback(commitInstantTime, instantTime)) { +throw new HoodieMetadataException("Failed to rollback deltacommit at " + commitInstantTime); + } +} else { + LOG.info(String.format("Ignoring rollback of instant %s at %s since there are no corresponding deltacommits on MDT", + commitInstantTime, instantTime)); } + +// Rollback of MOR table may end up adding a new log file. So we need to check for added files and add them to MDT +processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, metadataMetaClient.getActiveTimeline(), +rollbackMetadata, getRecordsGenerationParams(), instantTime, +metadata.getSyncedInstantTime(), true), false); Review Comment: just wanted to double confirm. in the list of valid instants we populate while reading MDT using Log Record Reader, we do include rollback instants from DT right? How this might pan out, if a async compaction from DT is rolled back multiple times and then finally it gets committed? ``` public static Set getValidInstantTimestamps(HoodieTableMetaClient dataMetaClient,
[GitHub] [hudi] nsivabalan commented on a diff in pull request #8837: [HUDI-6153] Changed the rollback mechanism for MDT to actual rollbacks rather than appending revert blocks.
nsivabalan commented on code in PR #8837: URL: https://github.com/apache/hudi/pull/8837#discussion_r1242961967 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -851,26 +919,49 @@ public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { */ @Override public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) { -if (enabled && metadata != null) { - // Is this rollback of an instant that has been synced to the metadata table? - String rollbackInstant = rollbackMetadata.getCommitsRollback().get(0); - boolean wasSynced = metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, rollbackInstant)); - if (!wasSynced) { -// A compaction may have taken place on metadata table which would have included this instant being rolled back. -// Revisit this logic to relax the compaction fencing : https://issues.apache.org/jira/browse/HUDI-2458 -Option latestCompaction = metadata.getLatestCompactionTime(); -if (latestCompaction.isPresent()) { - wasSynced = HoodieTimeline.compareTimestamps(rollbackInstant, HoodieTimeline.LESSER_THAN_OR_EQUALS, latestCompaction.get()); -} +// The commit which is being rolled back on the dataset +final String commitInstantTime = rollbackMetadata.getCommitsRollback().get(0); +// Find the deltacommits since the last compaction +Option> deltaCommitsInfo = + CompactionUtils.getDeltaCommitsSinceLatestCompaction(metadataMetaClient.getActiveTimeline()); +if (!deltaCommitsInfo.isPresent()) { + LOG.info(String.format("Ignoring rollback of instant %s at %s since there are no deltacommits on MDT", commitInstantTime, instantTime)); + return; +} + +// This could be a compaction or deltacommit instant (See CompactionUtils.getDeltaCommitsSinceLatestCompaction) +HoodieInstant compactionInstant = deltaCommitsInfo.get().getValue(); +HoodieTimeline deltacommitsSinceCompaction = deltaCommitsInfo.get().getKey(); + +// The deltacommit that will be rolled back +HoodieInstant deltaCommitInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, commitInstantTime); + +// The commit being rolled back should not be older than the latest compaction on the MDT. Compaction on MDT only occurs when all actions +// are completed on the dataset. Hence, this case implies a rollback of completed commit which should actually be handled using restore. +if (compactionInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION)) { + final String compactionInstantTime = compactionInstant.getTimestamp(); + if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(commitInstantTime, compactionInstantTime)) { +throw new HoodieMetadataException(String.format("Commit being rolled back %s is older than the latest compaction %s. " ++ "There are %d deltacommits after this compaction: %s", commitInstantTime, compactionInstantTime, +deltacommitsSinceCompaction.countInstants(), deltacommitsSinceCompaction.getInstants())); } +} - Map> records = - HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, metadataMetaClient.getActiveTimeline(), - rollbackMetadata, getRecordsGenerationParams(), instantTime, - metadata.getSyncedInstantTime(), wasSynced); - commit(instantTime, records, false); - closeInternal(); +if (deltaCommitsInfo.get().getKey().containsInstant(deltaCommitInstant)) { + LOG.info("Rolling back MDT deltacommit " + commitInstantTime); + if (!getWriteClient().rollback(commitInstantTime, instantTime)) { +throw new HoodieMetadataException("Failed to rollback deltacommit at " + commitInstantTime); + } +} else { + LOG.info(String.format("Ignoring rollback of instant %s at %s since there are no corresponding deltacommits on MDT", + commitInstantTime, instantTime)); } + +// Rollback of MOR table may end up adding a new log file. So we need to check for added files and add them to MDT +processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, metadataMetaClient.getActiveTimeline(), +rollbackMetadata, getRecordsGenerationParams(), instantTime, +metadata.getSyncedInstantTime(), true), false); Review Comment: I get it. for MOR data table, rollback will add a new log file in DT. And so we need this to track adding the new file. But can we optimize this so that this gets triggered only for MOR table or only when there are files to be added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail:
[GitHub] [hudi] nsivabalan commented on a diff in pull request #8837: [HUDI-6153] Changed the rollback mechanism for MDT to actual rollbacks rather than appending revert blocks.
nsivabalan commented on code in PR #8837: URL: https://github.com/apache/hudi/pull/8837#discussion_r1239143776 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -669,32 +669,51 @@ public void restoreToSavepoint() { * @param savepointTime Savepoint time to rollback to */ public void restoreToSavepoint(String savepointTime) { -boolean initialMetadataTableIfNecessary = config.isMetadataTableEnabled(); -if (initialMetadataTableIfNecessary) { +boolean initializeMetadataTableIfNecessary = config.isMetadataTableEnabled(); +if (initializeMetadataTableIfNecessary) { try { -// Delete metadata table directly when users trigger savepoint rollback if mdt existed and beforeTimelineStarts +// Delete metadata table directly when users trigger savepoint rollback if mdt existed and if the savePointTime is beforeTimelineStarts +// or before the oldest compaction on MDT. +// We cannot restore to before the oldest compaction on MDT as we don't have the basefiles before that time. String metadataTableBasePathStr = HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath()); HoodieTableMetaClient mdtClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePathStr).build(); -// Same as HoodieTableMetadataUtil#processRollbackMetadata +Option latestMdtCompaction = mdtClient.getCommitTimeline().filterCompletedInstants().lastInstant(); +boolean deleteMDT = false; +if (latestMdtCompaction.isPresent()) { + if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(savepointTime, latestMdtCompaction.get().getTimestamp())) { +LOG.warn(String.format("Deleting MDT during restore to %s as the savepoint is older than oldest compaction %s on MDT", +savepointTime, latestMdtCompaction.get().getTimestamp())); +deleteMDT = true; + } +} + HoodieInstant syncedInstant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime); // The instant required to sync rollback to MDT has been archived and the mdt syncing will be failed // So that we need to delete the whole MDT here. if (mdtClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp())) { + LOG.warn(String.format("Deleting MDT during restore to %s as the savepoint is older than the MDT timeline %s", Review Comment: we can do a minor optimization here. ``` if (!deleteMDT && ...) ``` bcoz, if previous block already decided to delete MDT, we don't need to further check other conditions ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -837,10 +840,75 @@ public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { */ @Override public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { -processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, -metadataMetaClient.getActiveTimeline(), restoreMetadata, getRecordsGenerationParams(), instantTime, -metadata.getSyncedInstantTime()), false); -closeInternal(); +dataMetaClient.reloadActiveTimeline(); + +// Since the restore has completed on the dataset, the latest write timeline instant is the one to which the +// restore was performed. This should be always present. +final String restoreToInstantTime = dataMetaClient.getActiveTimeline().getWriteTimeline() +.getReverseOrderedInstants().findFirst().get().getTimestamp(); + +// We cannot restore to before the oldest compaction on MDT as we don't have the basefiles before that time. +Option lastCompaction = metadataMetaClient.getCommitTimeline().filterCompletedInstants().lastInstant(); Review Comment: is it oldest Compaction or latest Compaction. java docs does not align w/ code. ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -837,10 +840,75 @@ public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { */ @Override public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { -processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, -metadataMetaClient.getActiveTimeline(), restoreMetadata, getRecordsGenerationParams(), instantTime, -metadata.getSyncedInstantTime()), false); -closeInternal(); +dataMetaClient.reloadActiveTimeline(); + +// Since the restore has completed on the dataset, the latest write timeline instant is the one to which the +// restore was performed. This should be always present. +final String restoreToInstantTime =
[GitHub] [hudi] nsivabalan commented on a diff in pull request #8837: [HUDI-6153] Changed the rollback mechanism for MDT to actual rollbacks rather than appending revert blocks.
nsivabalan commented on code in PR #8837: URL: https://github.com/apache/hudi/pull/8837#discussion_r1212500637 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -837,10 +840,75 @@ public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { */ @Override public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { -processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, -metadataMetaClient.getActiveTimeline(), restoreMetadata, getRecordsGenerationParams(), instantTime, -metadata.getSyncedInstantTime()), false); -closeInternal(); +dataMetaClient.reloadActiveTimeline(); + +// Since the restore has completed on the dataset, the latest write timeline instant is the one to which the +// restore was performed. This should be always present. +final String restoreToInstantTime = dataMetaClient.getActiveTimeline().getWriteTimeline() +.getReverseOrderedInstants().findFirst().get().getTimestamp(); + +// We cannot restore to before the oldest compaction on MDT as we don't have the basefiles before that time. +Option lastCompaction = metadataMetaClient.getCommitTimeline().filterCompletedInstants().lastInstant(); Review Comment: shouldn't this logic go into restore method in BaseHoodieWriteClient where we trigger the restore for data table. so, this is double/repeated validation right? ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ## @@ -669,32 +669,51 @@ public void restoreToSavepoint() { * @param savepointTime Savepoint time to rollback to */ public void restoreToSavepoint(String savepointTime) { -boolean initialMetadataTableIfNecessary = config.isMetadataTableEnabled(); -if (initialMetadataTableIfNecessary) { +boolean initializeMetadataTableIfNecessary = config.isMetadataTableEnabled(); +if (initializeMetadataTableIfNecessary) { try { -// Delete metadata table directly when users trigger savepoint rollback if mdt existed and beforeTimelineStarts +// Delete metadata table directly when users trigger savepoint rollback if mdt existed and if the savePointTime is beforeTimelineStarts +// or before the oldest compaction on MDT. +// We cannot restore to before the oldest compaction on MDT as we don't have the basefiles before that time. String metadataTableBasePathStr = HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath()); HoodieTableMetaClient mdtClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePathStr).build(); -// Same as HoodieTableMetadataUtil#processRollbackMetadata +Option lastCompaction = mdtClient.getCommitTimeline().filterCompletedInstants().lastInstant(); Review Comment: rename to "latestMdtCompaction" ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java: ## @@ -837,10 +840,75 @@ public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { */ @Override public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { -processAndCommit(instantTime, () -> HoodieTableMetadataUtil.convertMetadataToRecords(engineContext, -metadataMetaClient.getActiveTimeline(), restoreMetadata, getRecordsGenerationParams(), instantTime, -metadata.getSyncedInstantTime()), false); -closeInternal(); +dataMetaClient.reloadActiveTimeline(); + +// Since the restore has completed on the dataset, the latest write timeline instant is the one to which the +// restore was performed. This should be always present. +final String restoreToInstantTime = dataMetaClient.getActiveTimeline().getWriteTimeline() +.getReverseOrderedInstants().findFirst().get().getTimestamp(); + +// We cannot restore to before the oldest compaction on MDT as we don't have the basefiles before that time. +Option lastCompaction = metadataMetaClient.getCommitTimeline().filterCompletedInstants().lastInstant(); +if (lastCompaction.isPresent()) { + if (HoodieTimeline.LESSER_THAN_OR_EQUALS.test(restoreToInstantTime, lastCompaction.get().getTimestamp())) { +String msg = String.format("Cannot restore MDT to %s because it is older than latest compaction at %s", restoreToInstantTime, +lastCompaction.get().getTimestamp()) + ". Please delete MDT and restore again"; +LOG.error(msg); +throw new HoodieMetadataException(msg); + } +} + +// Restore requires the existing pipelines to be shutdown. So we can safely scan the dataset to find the current +// list of files in the filesystem. +List dirInfoList = listAllPartitions(dataMetaClient); +Map dirInfoMap =