This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e406e5d092d0 feat(spark): add restore_to_instant stored procedure
(#18696)
e406e5d092d0 is described below
commit e406e5d092d090926803946445ede3e6ac261af0
Author: Mahsood Ebrahim <[email protected]>
AuthorDate: Mon May 18 23:43:55 2026 -0700
feat(spark): add restore_to_instant stored procedure (#18696)
* feat(spark): add restore_to_instant stored procedure
Adds a Spark SQL stored procedure that performs a point-in-time table
restore to any instant on the active timeline, with optional post-restore
file-existence audit. Unlike rollback_to_savepoint, no savepoint is
required at the target instant.
Centralizes the MDT pre-check that was inlined in restoreToSavepoint into
a new BaseHoodieWriteClient.shouldDeleteMdtBeforeRestore helper, and
extends restoreToInstant to invoke it. The helper also catches the
penultimate-compaction case (target at or before the second-most-recent
MDT compaction) which the previous restoreToSavepoint inline check
missed. IO/permission failures now surface as HoodieException instead of
being silently swallowed.
The audit returns a tri-state result (PASSED / FAILED / INCONCLUSIVE) so
transient cloud-storage timeouts are distinguishable from real
audit failures; an audit_only mode lets users re-audit a previously
completed restore by passing its restore_instant_time.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* fix(spark): address PR review: public MDT guard, variable rename, test
coverage
Add a public deleteMetadataTableIfNecessaryBeforeRestore method to
BaseHoodieWriteClient so callers that drive restore via restoreToInstant
directly (e.g. the restore_to_instant procedure) can pre-check and
pre-emptively delete the MDT before calling restoreToInstant. This
closes the coverage gap identified in review: the procedure previously
had no MDT protection after shouldDeleteMdtBeforeRestore was removed
from restoreToInstant.
The procedure now calls client.deleteMetadataTableIfNecessaryBeforeRestore
before restoreToInstant and passes the returned boolean as
initialMetadataTableIfNecessary, ensuring the penultimate/oldest
compaction and timeline-start checks fire on the procedure path.
Rename internal Scala variable restoreInstantTime -> startRestoreTimeArg
for consistency with the start_restore_time parameter name.
Strengthen testRestoreToInstantSkipsMdtCheckWhenMetadataDisabled: the
test now verifies (a) deleteMetadataTableIfNecessaryBeforeRestore
returns false and deletes the MDT for a target at/before the oldest
compaction, and (b) restoreToInstant(target, false) then proceeds
without invoking the guard.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
* fix(spark): address PR review round 2 — remove penultimate check, rename
public API
Remove the penultimate-compaction check from shouldDeleteMdtBeforeRestore.
The check fired too aggressively: it deleted the MDT even when the MDT
could restore successfully, and then the fresh re-bootstrap failed for
record_index-enabled tables (testRLIWithMDTCleaning) or triggered an
inline MDT compaction during bootstrap that broke the
isMetadataTableRecreatedDuringRestore detection
(testRestoreToSavepointDeletesMdtWhenTargetIsBeforePenultimateCompaction).
Only the oldest-compaction and timeline-start checks remain, matching
the original pre-PR behaviour.
Add a catch clause for HoodieException (e.g. TableNotFoundException from
a partially initialized MDT directory) so a corrupt-but-present MDT is
treated as absent rather than hard-failing the restore.
Rename deleteMetadataTableIfNecessaryBeforeRestore ->
deleteMdtIfNecessaryBeforeRestore and flip the return value so that
true = MDT was deleted (callers negate with ! when passing to
restoreToInstant). Update RestoreToInstantProcedure and the
corresponding Java test accordingly.
Restore the @Deprecated annotation on the two-arg rollback overload that
was accidentally dropped in a prior commit.
Remove
testRestoreToSavepointDeletesMdtWhenTargetIsBeforePenultimateCompaction
which tested the now-removed penultimate check.
Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---------
Co-authored-by: mahsoode <[email protected]>
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 120 +++++---
.../TestSavepointRestoreCopyOnWrite.java | 89 ++++++
.../hudi/command/procedures/HoodieProcedures.scala | 1 +
.../procedures/RestoreToInstantProcedure.scala | 309 +++++++++++++++++++++
.../sql/hudi/procedure/TestRestoreProcedure.scala | 258 +++++++++++++++++
5 files changed, 739 insertions(+), 38 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index df06df2fbbba..76ffe51a256d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -92,6 +92,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -846,44 +847,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
*/
public void restoreToSavepoint(String savepointTime) {
boolean initializeMetadataTableIfNecessary =
config.isMetadataTableEnabled();
- if (initializeMetadataTableIfNecessary) {
- try {
- // Delete metadata table directly when users trigger savepoint
rollback if mdt existed and if the savePointTime is beforeTimelineStarts
- // or before the oldest compaction on MDT.
- // We cannot restore to before the oldest compaction on MDT as we
don't have the basefiles before that time.
- HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
- .setConf(storageConf.newInstance())
-
.setBasePath(getMetadataTableBasePath(config.getBasePath())).build();
- Option<HoodieInstant> oldestMdtCompaction =
mdtMetaClient.getCommitTimeline().filterCompletedInstants().firstInstant();
- boolean deleteMDT = false;
- if (oldestMdtCompaction.isPresent()) {
- if (LESSER_THAN_OR_EQUALS.test(savepointTime,
oldestMdtCompaction.get().requestedTime())) {
- log.warn("Deleting MDT during restore to {} as the savepoint is
older than oldest compaction {} on MDT",
- savepointTime, oldestMdtCompaction.get().requestedTime());
- deleteMDT = true;
- }
- }
-
- // The instant required to sync rollback to MDT has been archived and
the mdt syncing will be failed
- // So that we need to delete the whole MDT here.
- if (!deleteMDT) {
- HoodieInstant syncedInstant =
mdtMetaClient.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime);
- if
(mdtMetaClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.requestedTime()))
{
- log.warn("Deleting MDT during restore to {} as the savepoint is
older than the MDT timeline {}",
- savepointTime,
mdtMetaClient.getCommitsTimeline().firstInstant().get().requestedTime());
- deleteMDT = true;
- }
- }
-
- if (deleteMDT) {
- HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(),
context);
- // rollbackToSavepoint action will try to bootstrap MDT at first but
sync to MDT will fail at the current scenario.
- // so that we need to disable metadata initialized here.
- initializeMetadataTableIfNecessary = false;
- }
- } catch (Exception e) {
- // Metadata directory does not exist
- }
+ if (initializeMetadataTableIfNecessary &&
shouldDeleteMdtBeforeRestore(savepointTime)) {
+ HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(),
context);
+ // rollbackToSavepoint action will try to bootstrap MDT at first but
sync to MDT will fail at the current scenario.
+ // so that we need to disable metadata initialized here.
+ initializeMetadataTableIfNecessary = false;
}
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN,
Option.empty(), initializeMetadataTableIfNecessary);
@@ -894,6 +862,82 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
SavepointHelpers.validateSavepointRestore(table, savepointTime);
}
+ /**
+ * Decides whether the metadata table (MDT) must be deleted before restoring
the data table to
+ * {@code targetInstant}. Returns true when restoring would leave the MDT in
an inconsistent
+ * state, specifically when any of the following holds:
+ * <ol>
+ * <li>The target is at or before the oldest completed compaction. We
cannot restore to before
+ * the oldest compaction because we don't have base files before that
time.</li>
+ * <li>The target is before the MDT timeline start (the relevant history
was archived away).</li>
+ * </ol>
+ * Returns false when the MDT directory does not exist or is not readable
(nothing to delete or
+ * worry about). Wraps genuine IO failures ({@link IOException}) in a {@link
HoodieException}
+ * so permission / network errors surface to the caller.
+ */
+ protected boolean shouldDeleteMdtBeforeRestore(String targetInstant) {
+ String mdtBasePath = getMetadataTableBasePath(config.getBasePath());
+ try {
+ // Cheap existence check first to avoid constructing an MDT meta client
when there is no MDT.
+ if (!storage.exists(new StoragePath(mdtBasePath))) {
+ return false;
+ }
+ HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf.newInstance())
+ .setBasePath(mdtBasePath).build();
+ List<HoodieInstant> completedCompactions =
mdtMetaClient.getCommitTimeline()
+ .filterCompletedInstants().getInstants();
+ Option<HoodieInstant> oldestMdtCompaction =
completedCompactions.isEmpty()
+ ? Option.empty() : Option.of(completedCompactions.get(0));
+ if (oldestMdtCompaction.isPresent()
+ && LESSER_THAN_OR_EQUALS.test(targetInstant,
oldestMdtCompaction.get().requestedTime())) {
+ log.warn("Deleting MDT before restore to {}: target is at or before
oldest MDT compaction {}",
+ targetInstant, oldestMdtCompaction.get().requestedTime());
+ return true;
+ }
+ if
(mdtMetaClient.getCommitsTimeline().isBeforeTimelineStarts(targetInstant)) {
+ log.warn("Deleting MDT before restore to {}: target is before MDT
timeline start", targetInstant);
+ return true;
+ }
+ return false;
+ } catch (IOException e) {
+ throw new HoodieException(
+ "Failed to inspect MDT at " + mdtBasePath + " before restore to " +
targetInstant
+ + " - refusing to silently proceed without an MDT integrity
check.", e);
+ } catch (HoodieException e) {
+ // MDT directory exists but is not usable (e.g. TableNotFoundException
from a partially
+ // initialized MDT). Treat as absent: no deletion needed, let the
restore proceed.
+ log.warn("MDT at {} is present but could not be read ({}); skipping
pre-check.",
+ mdtBasePath, e.getMessage());
+ return false;
+ }
+ }
+
+ /**
+ * Deletes the metadata table (MDT) if it would be left in an inconsistent
state by a restore
+ * to {@code targetInstant}, and returns whether the MDT was actually
deleted.
+ *
+ * <p>Callers that drive restore via {@link #restoreToInstant} directly
(e.g. the
+ * {@code restore_to_instant} stored procedure) should call this method
before invoking
+ * {@code restoreToInstant} and suppress MDT initialization when it returns
{@code true}:
+ *
+ * <pre>{@code
+ * boolean mdtDeleted =
client.deleteMdtIfNecessaryBeforeRestore(targetInstant);
+ * client.restoreToInstant(targetInstant, !mdtDeleted && enableMetadata);
+ * }</pre>
+ *
+ * @param targetInstant the instant the data table will be restored to
+ * @return {@code true} if the MDT was deleted (caller must not
re-initialize it);
+ * {@code false} otherwise (MDT either did not need deletion or does
not exist)
+ */
+ public boolean deleteMdtIfNecessaryBeforeRestore(String targetInstant) {
+ if (shouldDeleteMdtBeforeRestore(targetInstant)) {
+ HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(),
context);
+ return true;
+ }
+ return false;
+ }
+
@Deprecated
public boolean rollback(final String commitInstantTime) throws
HoodieRollbackException {
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN,
Option.empty());
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
index 0be971630c34..5c3c83226073 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.HoodieClientTestBase;
@@ -41,6 +43,7 @@ import java.util.Objects;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -217,4 +220,90 @@ public class TestSavepointRestoreCopyOnWrite extends
HoodieClientTestBase {
assertRowNumberEqualsTo(20);
}
}
+
+ /**
+ * Two-assertion test for the MDT pre-check guard:
+ * <ol>
+ * <li>{@code deleteMdtIfNecessaryBeforeRestore(firstCommit)} returns
{@code true} (and
+ * deletes the MDT) when the target is at or before the oldest MDT
compaction,
+ * confirming the guard logic fires for this table setup.</li>
+ * <li>{@code restoreToInstant(firstCommit, false)} does NOT invoke the
pre-check, so a
+ * caller that explicitly opts out of MDT integration is never
surprised by an implicit
+ * MDT deletion.</li>
+ * </ol>
+ */
+ @Test
+ void testRestoreToInstantSkipsMdtCheckWhenMetadataDisabled() throws
Exception {
+ HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withRollbackUsingMarkers(true)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withMaxNumDeltaCommitsBeforeCompaction(4)
+ .build())
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ String firstCommit = null;
+ String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+ final int numRecords = 10;
+ // 5 inserts so the MDT goes through one compaction with
maxDeltaCommits=4.
+ for (int i = 1; i <= 5; i++) {
+ String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant,
numRecords, SparkRDDWriteClient::insert,
+ false, true, numRecords, numRecords * i, 1, Option.empty(),
INSTANT_GENERATOR);
+ prevInstant = newCommitTime;
+ if (i == 1) {
+ firstCommit = newCommitTime;
+ }
+ }
+ assertRowNumberEqualsTo(50);
+
+ String mdtBasePath =
HoodieTableMetadata.getMetadataTableBasePath(hoodieWriteConfig.getBasePath());
+ assertTrue(HoodieStorageUtils.getStorage(mdtBasePath,
storageConf).exists(new StoragePath(mdtBasePath)),
+ "MDT directory should exist before any pre-check");
+
+ // Assertion 1: deleteMdtIfNecessaryBeforeRestore detects that
firstCommit is at or before
+ // the oldest MDT compaction, deletes the MDT, and returns true.
+ boolean mdtDeleted = client.deleteMdtIfNecessaryBeforeRestore(
+ Objects.requireNonNull(firstCommit, "first commit should not be
null"));
+ assertTrue(mdtDeleted,
+ "deleteMdtIfNecessaryBeforeRestore should return true when target is
at or before the oldest MDT compaction");
+ assertFalse(HoodieStorageUtils.getStorage(mdtBasePath,
storageConf).exists(new StoragePath(mdtBasePath)),
+ "MDT directory should have been deleted by
deleteMdtIfNecessaryBeforeRestore");
+
+ // Assertion 2: restoreToInstant with
initialMetadataTableIfNecessary=false does NOT invoke
+ // the pre-check — the MDT (now absent) is not touched. The restore
proceeds without MDT.
+ client.restoreToInstant(firstCommit, false);
+ assertRowNumberEqualsTo(numRecords);
+ }
+ }
+
+ /**
+ * Regression coverage for the {@code restoreToSavepoint} refactor. After
replacing the inline
+ * MDT pre-check with a call to the centralized helper, the common case
(target after the
+ * oldest MDT compaction, no MDT delete needed) must still work end-to-end.
+ */
+ @Test
+ void testRestoreToSavepointStillWorksAfterRefactor() throws Exception {
+ HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withRollbackUsingMarkers(true)
+ .build();
+ try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{
+ String savepointCommit = null;
+ String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+ final int numRecords = 10;
+ for (int i = 1; i <= 4; i++) {
+ String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+ insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant,
numRecords, SparkRDDWriteClient::insert,
+ false, true, numRecords, numRecords * i, 1, Option.empty(),
INSTANT_GENERATOR);
+ prevInstant = newCommitTime;
+ if (i == 2) {
+ savepointCommit = newCommitTime;
+ client.savepoint("user1", "Savepoint for 2nd commit");
+ }
+ }
+ assertRowNumberEqualsTo(40);
+ client.restoreToSavepoint(Objects.requireNonNull(savepointCommit,
"restore commit should not be null"));
+ assertRowNumberEqualsTo(20);
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 508e37d47663..89ada91e839c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -39,6 +39,7 @@ object HoodieProcedures {
,(DeleteSavepointProcedure.NAME, DeleteSavepointProcedure.builder)
,(RollbackToSavepointProcedure.NAME,
RollbackToSavepointProcedure.builder)
,(RollbackToInstantTimeProcedure.NAME,
RollbackToInstantTimeProcedure.builder)
+ ,(RestoreToInstantProcedure.NAME, RestoreToInstantProcedure.builder)
,(RunClusteringProcedure.NAME, RunClusteringProcedure.builder)
,(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder)
,(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RestoreToInstantProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RestoreToInstantProcedure.scala
new file mode 100644
index 000000000000..ca5a2fc924ae
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RestoreToInstantProcedure.scala
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.avro.model.HoodieRestoreMetadata
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.fs.ConsistencyGuardConfig
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.storage.StoragePath
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.hudi.command.procedures.RestoreToInstantProcedure._
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Stored procedure to perform a full point-in-time table restore to a given
instant.
+ *
+ * Unlike [[RollbackToSavepointProcedure]] (which requires a savepoint at the
target instant),
+ * this procedure calls restoreToInstant() directly and works on any arbitrary
instant on the
+ * active timeline.
+ *
+ * Parameters:
+ * - table / path: identifies the Hudi table (one must be provided)
+ * - instant_time: target commit to restore to (required when
audit_only=false; must be omitted
+ * when audit_only=true)
+ * - start_restore_time: the restore operation's own timeline timestamp (the
start_restore_time
+ * value returned by a prior restore_to_instant call).
Required when
+ * audit_only=true; must be omitted otherwise.
+ * - enable_metadata: whether the metadata table is enabled (default: true)
+ * - rollback_parallelism: Spark parallelism for rollback and audit
operations (default: 4)
+ * - enable_consistency_guard: enable consistency guard for file existence
checks (default: false)
+ * - audit_post_restore: after restoring, verify that all
successfully-deleted files are absent (default: false)
+ * - audit_only: skip the restore and only audit a previously completed
restore instant (default: false)
+ *
+ * Output columns:
+ * - restore_result: true if restore succeeded; null if audit_only=true
+ * - start_restore_time: the restore operation's own timeline timestamp;
null if audit_only=true.
+ * Pass this value as start_restore_time to re-run the
audit later.
+ * - time_taken_in_millis: restore duration; null if audit_only=true
+ * - instants_rolled_back: number of commits rolled back; null if
audit_only=true
+ * - audit_result: one of "PASSED" / "FAILED" / "INCONCLUSIVE" when an audit
ran; null otherwise.
+ * INCONCLUSIVE means at least one file existence check
threw an IOException
+ * (e.g. transient cloud-storage timeout) — re-run
audit_only=true to retry.
+ */
+class RestoreToInstantProcedure extends BaseProcedure with ProcedureBuilder
with Logging {
+
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "instant_time", DataTypes.StringType),
+ ProcedureParameter.optional(2, "enable_metadata", DataTypes.BooleanType,
true),
+ ProcedureParameter.optional(3, "rollback_parallelism",
DataTypes.IntegerType, 4),
+ ProcedureParameter.optional(4, "enable_consistency_guard",
DataTypes.BooleanType, false),
+ ProcedureParameter.optional(5, "audit_post_restore",
DataTypes.BooleanType, false),
+ ProcedureParameter.optional(6, "audit_only", DataTypes.BooleanType, false),
+ ProcedureParameter.optional(7, "path", DataTypes.StringType),
+ ProcedureParameter.optional(8, "start_restore_time", DataTypes.StringType)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("restore_result", DataTypes.BooleanType, nullable = true,
Metadata.empty),
+ StructField("start_restore_time", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("time_taken_in_millis", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("instants_rolled_back", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("audit_result", DataTypes.StringType, nullable = true,
Metadata.empty)
+ ))
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val instantTime = getArgValueOrDefault(args, PARAMETERS(1))
+ val enableMetadata = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[Boolean]
+ val rollbackParallelism = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[Int]
+ val enableConsistencyGuard = getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[Boolean]
+ val shouldAuditPostRestore = getArgValueOrDefault(args,
PARAMETERS(5)).get.asInstanceOf[Boolean]
+ val auditOnly = getArgValueOrDefault(args,
PARAMETERS(6)).get.asInstanceOf[Boolean]
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(7))
+ val startRestoreTimeArg = getArgValueOrDefault(args, PARAMETERS(8))
+
+ // Cross-validation: each of (instant_time, start_restore_time) has one
unambiguous meaning.
+ if (!auditOnly && instantTime.isEmpty) {
+ throw new HoodieException("instant_time is required when
audit_only=false.")
+ }
+ if (auditOnly && startRestoreTimeArg.isEmpty) {
+ throw new HoodieException(
+ "start_restore_time is required when audit_only=true. " +
+ "Pass the start_restore_time value from a prior restore_to_instant
call.")
+ }
+ if (!auditOnly && startRestoreTimeArg.isDefined) {
+ throw new HoodieException("start_restore_time may only be specified when
audit_only=true.")
+ }
+ if (auditOnly && instantTime.isDefined) {
+ throw new HoodieException(
+ "instant_time may only be specified when audit_only=false. " +
+ "Use start_restore_time to identify a previously executed restore.")
+ }
+ if (auditOnly && shouldAuditPostRestore) {
+ logWarning("Both audit_only and audit_post_restore are set. Only
audit_only will be honored.")
+ }
+
+ val basePath = getBasePath(tableName, tablePath)
+
+ val confs = Map(
+ HoodieMetadataConfig.ENABLE.key() -> enableMetadata.toString,
+ HoodieWriteConfig.ROLLBACK_PARALLELISM_VALUE.key() ->
rollbackParallelism.toString,
+ HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key() -> "false"
+ ) ++ (if (enableConsistencyGuard) Map(ConsistencyGuardConfig.ENABLE.key()
-> "true") else Map.empty)
+
+ val metaClient = createMetaClient(jsc, basePath)
+
+ // Nullable boxed types so Row can hold null for audit_only runs
+ var restoreResult: java.lang.Boolean = null
+ var startRestoreTime: String = null
+ var timeTakenInMillis: java.lang.Long = null
+ var instantsRolledBack: java.lang.Long = null
+
+ if (!auditOnly) {
+ val targetInstant = instantTime.get.asInstanceOf[String]
+ var client: SparkRDDWriteClient[_] = null
+ try {
+ client = HoodieCLIUtils.createHoodieWriteClient(sparkSession,
basePath, confs,
+ tableName.asInstanceOf[Option[String]])
+ // Pre-check: if the target instant is at or before the oldest MDT
compaction or before the
+ // MDT timeline start, pre-emptively delete the MDT so
restoreToInstant does not leave it
+ // inconsistent. deleteMdtIfNecessaryBeforeRestore returns true when
the MDT was deleted
+ // (caller must not re-initialize it), false otherwise.
+ val mdtDeleted = if (enableMetadata) {
+ client.deleteMdtIfNecessaryBeforeRestore(targetInstant)
+ } else false
+ val restoreMetadata = client.restoreToInstant(targetInstant,
!mdtDeleted && enableMetadata)
+ restoreResult = true
+ startRestoreTime = restoreMetadata.getStartRestoreTime
+ timeTakenInMillis = restoreMetadata.getTimeTakenInMillis
+ instantsRolledBack =
restoreMetadata.getInstantsToRollback.size().toLong
+ } finally {
+ if (client != null) {
+ client.close()
+ }
+ }
+ if (tableName.isDefined) {
+ spark.catalog.refreshTable(tableName.get.asInstanceOf[String])
+ }
+ // getActiveTimeline is lazily cached; reload so the new .restore
instant is visible to the audit.
+ if (shouldAuditPostRestore) {
+ metaClient.reloadActiveTimeline()
+ }
+ }
+
+ var auditResult: String = null
+ if (auditOnly || shouldAuditPostRestore) {
+ val restoreInstant: HoodieInstant = if (auditOnly) {
+ val ts = startRestoreTimeArg.get.asInstanceOf[String]
+ val instants =
metaClient.getActiveTimeline.getRestoreTimeline.filterCompletedInstants
+ .getInstants.asScala
+ instants.find(_.requestedTime().equals(ts)).getOrElse(
+ throw new HoodieException(s"No completed restore instant found for
$ts. " +
+ "Pass the start_restore_time from a prior restore_to_instant call
as start_restore_time.")
+ )
+ } else {
+ // Use startRestoreTime captured from the restore metadata — more
deterministic than
+ // lastInstant() since another concurrent restore could otherwise land
in between.
+ val ts = startRestoreTime
+ val instants =
metaClient.getActiveTimeline.getRestoreTimeline.filterCompletedInstants
+ .getInstants.asScala
+ instants.find(_.requestedTime().equals(ts)).getOrElse(
+ throw new HoodieException(s"No completed restore instant found for
$ts after restore.")
+ )
+ }
+ auditResult = auditPostRestore(metaClient, basePath, restoreInstant,
rollbackParallelism)
+ }
+
+ Seq(Row(restoreResult, startRestoreTime, timeTakenInMillis,
instantsRolledBack, auditResult))
+ }
+
+ /**
+ * Verifies that all files expected to have been deleted by a restore
operation are actually
+ * absent from storage. Returns one of "PASSED", "FAILED", or "INCONCLUSIVE":
+ * - PASSED: every file expected to be absent is in fact absent.
+ * - FAILED: at least one file is still present after restore.
+ * - INCONCLUSIVE: no file was confirmed present, but at least one
existence check threw
+ * an IOException (e.g. transient cloud-storage timeout).
Re-run with
+ * audit_only=true to retry.
+ */
+ private def auditPostRestore(
+ metaClient: HoodieTableMetaClient,
+ basePath: String,
+ restoreInstant: HoodieInstant,
+ rollbackParallelism: Int): String = {
+ try {
+ val restoreMetadata: HoodieRestoreMetadata =
+ metaClient.getActiveTimeline.readRestoreMetadata(restoreInstant)
+
+ val filesToCheck = new java.util.ArrayList[String]()
+ restoreMetadata.getHoodieRestoreMetadata.values.asScala.foreach {
rollbackList =>
+ rollbackList.asScala.foreach { rollback =>
+ rollback.getPartitionMetadata.asScala.foreach { case (partition, pm)
=>
+ val partitionPathStr = basePath + Path.SEPARATOR + partition
+ val partitionStoragePath = new StoragePath(partitionPathStr)
+ if (!metaClient.getStorage.exists(partitionStoragePath)) {
+ logInfo(s"Partition path $partitionStoragePath does not exist.
Skipping audit for its files.")
+ } else {
+ // Use .toString() to preserve the full absolute path. Using
.getName() would strip
+ // the path to just the filename, making the subsequent
FS.exists() check vacuously pass.
+ // Only check getSuccessDeleteFiles: these are the files the
restore intended to delete
+ // and whose absence we can meaningfully verify. Files in
getFailedDeleteFiles already
+ // failed to be deleted and are expected to still be present.
+ pm.getSuccessDeleteFiles.asScala.foreach(f =>
+ filesToCheck.add(new Path(partitionPathStr, f).toString))
+ }
+ }
+ }
+ }
+
+ if (filesToCheck.isEmpty) {
+ logInfo(s"No files to audit for restore instant
${restoreInstant.requestedTime()}")
+ "PASSED"
+ } else {
+ // HadoopFSUtils.getStorageConfWithCopy returns a Serializable
StorageConfiguration; the
+ // closure captures only storageConf, so Spark's ClosureCleaner can
serialize it cleanly.
+ // HadoopFSUtils.getFs internally calls prepareHadoopConf, preserving
HOODIE_ENV_* (e.g.
+ // S3A) injection at cloud DCs.
+ val storageConf =
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration())
+ val outcomes = jsc.parallelize(filesToCheck, Math.max(1,
rollbackParallelism))
+ .map { pathStr =>
+ val p = new Path(pathStr)
+ try {
+ val exists = HadoopFSUtils.getFs(p,
storageConf.unwrap()).exists(p)
+ val status: AuditFileStatus = if (exists) Present else Absent
+ (pathStr, status)
+ } catch {
+ case e: Exception => (pathStr, IndeterminateError(e.getMessage))
+ }
+ }
+ .collect()
+ .asScala
+ .toArray
+
+ val present = outcomes.collect { case (p, Present) => p }
+ val indeterminate = outcomes.collect { case (p,
IndeterminateError(msg)) => (p, msg) }
+ if (present.nonEmpty) {
+ logError(s"Restore audit FAILED for instant
${restoreInstant.requestedTime()}: " +
+ s"${present.length} file(s) still present, e.g.
${present.take(5).mkString(", ")}")
+ "FAILED"
+ } else if (indeterminate.nonEmpty) {
+ logWarning(s"Restore audit INCONCLUSIVE for instant
${restoreInstant.requestedTime()}: " +
+ s"${indeterminate.length} file(s) had IO errors, e.g.
${indeterminate.take(5).mkString(", ")}")
+ "INCONCLUSIVE"
+ } else {
+ logInfo(s"Restore audit PASSED for instant
${restoreInstant.requestedTime()}: " +
+ s"${outcomes.length} file(s) confirmed absent.")
+ "PASSED"
+ }
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Exception during restore audit for instant
${restoreInstant.requestedTime()}", e)
+ "INCONCLUSIVE"
+ }
+ }
+
+ override def build: Procedure = new RestoreToInstantProcedure()
+}
+
+object RestoreToInstantProcedure {
+ val NAME: String = "restore_to_instant"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get(): RestoreToInstantProcedure = new
RestoreToInstantProcedure()
+ }
+
+ // ADT for per-file audit outcomes. Defined at object level (NOT inside
auditPostRestore) so the
+ // generated bytecode does not embed a synthetic outer-class reference;
otherwise Spark's
+ // ClosureCleaner cannot serialize the closure that returns these values
from executors.
+ private sealed trait AuditFileStatus extends Serializable
+ private case object Absent extends AuditFileStatus
+ private case object Present extends AuditFileStatus
+ private case class IndeterminateError(message: String) extends
AuditFileStatus
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRestoreProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRestoreProcedure.scala
new file mode 100644
index 000000000000..7674fa6abaa8
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRestoreProcedure.scala
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+class TestRestoreProcedure extends HoodieSparkProcedureTestBase {
+
+ private def createTableAndInsertData(tableName: String, tablePath: String,
tableType: String = "cow"): Array[String] = {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | type = '$tableType'
+ | )
+ """.stripMargin)
+ spark.sql(s"insert into $tableName select 1, 'a1', 10.0, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20.0, 1500")
+ spark.sql(s"insert into $tableName select 3, 'a3', 30.0, 2000")
+ spark.sql(s"insert into $tableName select 4, 'a4', 40.0, 2500")
+ spark.sql(s"call show_commits(table => '$tableName')").collect()
+ .map(_.getString(0)).sorted
+ }
+
+ test("Test restore_to_instant basic CoW") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ val commits = createTableAndInsertData(tableName, tablePath)
+ assertResult(4)(commits.length)
+
+ // restore to after the 2nd commit — only commits(0) and commits(1)
should remain
+ val result = spark.sql(
+ s"call restore_to_instant(table => '$tableName', instant_time =>
'${commits(1)}')"
+ ).collect()
+
+ assertResult(1)(result.length)
+ assertResult(true)(result(0).getBoolean(0)) // restore_result
+ assert(result(0).getString(1) != null) // start_restore_time
(dynamic timestamp)
+ assert(result(0).getLong(2) >= 0) // time_taken_in_millis
+ assert(result(0).getLong(3) >= 0L) // instants_rolled_back
+ assertResult(true)(result(0).isNullAt(4)) // audit_result = null
(audit not requested)
+
+ // verify data reverted to 2 records
+ val count = spark.sql(s"select count(*) from
$tableName").collect()(0).getLong(0)
+ assertResult(2)(count)
+ }
+ }
+
+ test("Test restore_to_instant basic MoR") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ val commits = createTableAndInsertData(tableName, tablePath, tableType =
"mor")
+ assertResult(4)(commits.length)
+
+ val result = spark.sql(
+ s"call restore_to_instant(table => '$tableName', instant_time =>
'${commits(1)}')"
+ ).collect()
+
+ assertResult(1)(result.length)
+ assertResult(true)(result(0).getBoolean(0))
+ assert(result(0).getString(1) != null)
+ assert(result(0).getLong(2) >= 0)
+ assert(result(0).getLong(3) >= 0L)
+ assertResult(true)(result(0).isNullAt(4))
+
+ val count = spark.sql(s"select count(*) from
$tableName").collect()(0).getLong(0)
+ assertResult(2)(count)
+ }
+ }
+
+ test("Test restore_to_instant using path parameter") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ val commits = createTableAndInsertData(tableName, tablePath)
+ assertResult(4)(commits.length)
+
+ val result = spark.sql(
+ s"call restore_to_instant(path => '$tablePath', instant_time =>
'${commits(1)}')"
+ ).collect()
+
+ assertResult(1)(result.length)
+ assertResult(true)(result(0).getBoolean(0))
+ assert(result(0).isNullAt(4))
+
+ val count = spark.sql(s"select count(*) from
$tableName").collect()(0).getLong(0)
+ assertResult(2)(count)
+ }
+ }
+
+ test("Test restore_to_instant with audit_post_restore") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ val commits = createTableAndInsertData(tableName, tablePath)
+ assertResult(4)(commits.length)
+
+ val result = spark.sql(
+ s"""call restore_to_instant(
+ | table => '$tableName',
+ | instant_time => '${commits(1)}',
+ | audit_post_restore => true
+ |)""".stripMargin
+ ).collect()
+
+ assertResult(1)(result.length)
+ assertResult(true)(result(0).getBoolean(0)) // restore_result
+ assert(result(0).getString(1) != null) // start_restore_time
+ // audit_result should be "PASSED": all rolled-back files are absent
+ assertResult(false)(result(0).isNullAt(4))
+ assertResult("PASSED")(result(0).getString(4))
+
+ val count = spark.sql(s"select count(*) from
$tableName").collect()(0).getLong(0)
+ assertResult(2)(count)
+ }
+ }
+
+ test("Test restore_to_instant audit_only mode") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ val commits = createTableAndInsertData(tableName, tablePath)
+ assertResult(4)(commits.length)
+
+ // Round 1: perform the restore and capture the restore operation's own
timeline timestamp
+ val restoreRows = spark.sql(
+ s"call restore_to_instant(table => '$tableName', instant_time =>
'${commits(1)}')"
+ ).collect()
+ assertResult(true)(restoreRows(0).getBoolean(0))
+ // start_restore_time is the restore instant's timeline timestamp — NOT
commits(1)
+ val restoreInstantTs = restoreRows(0).getString(1)
+ assert(restoreInstantTs != null)
+ assert(restoreInstantTs != commits(1))
+
+ // Round 2: audit_only using start_restore_time (the original target
commit is not needed)
+ val auditRows = spark.sql(
+ s"""call restore_to_instant(
+ | table => '$tableName',
+ | audit_only => true,
+ | start_restore_time => '$restoreInstantTs'
+ |)""".stripMargin
+ ).collect()
+
+ assertResult(1)(auditRows.length)
+ assertResult(true)(auditRows(0).isNullAt(0)) // restore_result = null
(no restore performed)
+ assertResult(true)(auditRows(0).isNullAt(1)) // start_restore_time =
null
+ assertResult(true)(auditRows(0).isNullAt(2)) // time_taken_in_millis
= null
+ assertResult(true)(auditRows(0).isNullAt(3)) // instants_rolled_back
= null
+ assertResult(false)(auditRows(0).isNullAt(4)) // audit_result is
present
+ assertResult("PASSED")(auditRows(0).getString(4))
+ }
+ }
+
+ test("Test restore_to_instant audit_only with non-existent restore instant
throws") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ createTableAndInsertData(tableName, tablePath)
+
+ // start_restore_time pointing at a timestamp that has never been a
restore instant.
+ assertThrows[Exception] {
+ spark.sql(
+ s"""call restore_to_instant(
+ | table => '$tableName',
+ | audit_only => true,
+ | start_restore_time => '19700101000000000'
+ |)""".stripMargin
+ ).collect()
+ }
+ }
+ }
+
+ test("Test restore_to_instant cross-validation: audit_only=true requires
start_restore_time") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ createTableAndInsertData(tableName, tablePath)
+
+ assertThrows[Exception] {
+ spark.sql(
+ s"call restore_to_instant(table => '$tableName', audit_only => true)"
+ ).collect()
+ }
+ }
+ }
+
+ test("Test restore_to_instant cross-validation: audit_only=false rejects
start_restore_time") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ val commits = createTableAndInsertData(tableName, tablePath)
+
+ assertThrows[Exception] {
+ spark.sql(
+ s"""call restore_to_instant(
+ | table => '$tableName',
+ | instant_time => '${commits(1)}',
+ | start_restore_time => '20990101000000000'
+ |)""".stripMargin
+ ).collect()
+ }
+ }
+ }
+
+ test("Test restore_to_instant cross-validation: audit_only=true rejects
instant_time") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ val commits = createTableAndInsertData(tableName, tablePath)
+
+ assertThrows[Exception] {
+ spark.sql(
+ s"""call restore_to_instant(
+ | table => '$tableName',
+ | audit_only => true,
+ | instant_time => '${commits(1)}',
+ | start_restore_time => '20990101000000000'
+ |)""".stripMargin
+ ).collect()
+ }
+ }
+ }
+
+ test("Test restore_to_instant cross-validation: audit_only=false requires
instant_time") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath + "/" + tableName
+ createTableAndInsertData(tableName, tablePath)
+
+ assertThrows[Exception] {
+ spark.sql(s"call restore_to_instant(table => '$tableName')").collect()
+ }
+ }
+ }
+}