This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e406e5d092d0 feat(spark): add restore_to_instant stored procedure 
(#18696)
e406e5d092d0 is described below

commit e406e5d092d090926803946445ede3e6ac261af0
Author: Mahsood Ebrahim <[email protected]>
AuthorDate: Mon May 18 23:43:55 2026 -0700

    feat(spark): add restore_to_instant stored procedure (#18696)
    
    * feat(spark): add restore_to_instant stored procedure
    
    Adds a Spark SQL stored procedure that performs a point-in-time table
    restore to any instant on the active timeline, with optional post-restore
    file-existence audit. Unlike rollback_to_savepoint, no savepoint is
    required at the target instant.
    
    Centralizes the MDT pre-check that was inlined in restoreToSavepoint into
    a new BaseHoodieWriteClient.shouldDeleteMdtBeforeRestore helper, and
    extends restoreToInstant to invoke it. The helper also catches the
    penultimate-compaction case (target at or before the second-most-recent
    MDT compaction) which the previous restoreToSavepoint inline check
    missed. IO/permission failures now surface as HoodieException instead of
    being silently swallowed.
    
    The audit returns a tri-state result (PASSED / FAILED / INCONCLUSIVE) so
    transient cloud-storage timeouts are distinguishable from real
    audit failures; an audit_only mode lets users re-audit a previously
    completed restore by passing its restore_instant_time.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * fix(spark): address PR review: public MDT guard, variable rename, test 
coverage
    
    Add a public deleteMetadataTableIfNecessaryBeforeRestore method to
    BaseHoodieWriteClient so callers that drive restore via restoreToInstant
    directly (e.g. the restore_to_instant procedure) can pre-check and
    pre-emptively delete the MDT before calling restoreToInstant. This
    closes the coverage gap identified in review: the procedure previously
    had no MDT protection after shouldDeleteMdtBeforeRestore was removed
    from restoreToInstant.
    
    The procedure now calls client.deleteMetadataTableIfNecessaryBeforeRestore
    before restoreToInstant and passes the returned boolean as
    initialMetadataTableIfNecessary, ensuring the penultimate/oldest
    compaction and timeline-start checks fire on the procedure path.
    
    Rename internal Scala variable restoreInstantTime -> startRestoreTimeArg
    for consistency with the start_restore_time parameter name.
    
    Strengthen testRestoreToInstantSkipsMdtCheckWhenMetadataDisabled: the
    test now verifies (a) deleteMetadataTableIfNecessaryBeforeRestore
    returns false and deletes the MDT for a target at/before the oldest
    compaction, and (b) restoreToInstant(target, false) then proceeds
    without invoking the guard.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * fix(spark): address PR review round 2 — remove penultimate check, rename 
public API
    
    Remove the penultimate-compaction check from shouldDeleteMdtBeforeRestore.
    The check fired too aggressively: it deleted the MDT even when the MDT
    could restore successfully, and then the fresh re-bootstrap failed for
    record_index-enabled tables (testRLIWithMDTCleaning) or triggered an
    inline MDT compaction during bootstrap that broke the
    isMetadataTableRecreatedDuringRestore detection
    (testRestoreToSavepointDeletesMdtWhenTargetIsBeforePenultimateCompaction).
    Only the oldest-compaction and timeline-start checks remain, matching
    the original pre-PR behaviour.
    
    Add a catch clause for HoodieException (e.g. TableNotFoundException from
    a partially initialized MDT directory) so a corrupt-but-present MDT is
    treated as absent rather than hard-failing the restore.
    
    Rename deleteMetadataTableIfNecessaryBeforeRestore ->
    deleteMdtIfNecessaryBeforeRestore and flip the return value so that
    true = MDT was deleted (callers negate with ! when passing to
    restoreToInstant). Update RestoreToInstantProcedure and the
    corresponding Java test accordingly.
    
    Restore the @Deprecated annotation on the two-arg rollback overload that
    was accidentally dropped in a prior commit.
    
    Remove 
testRestoreToSavepointDeletesMdtWhenTargetIsBeforePenultimateCompaction
    which tested the now-removed penultimate check.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    ---------
    
    Co-authored-by: mahsoode <[email protected]>
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 120 +++++---
 .../TestSavepointRestoreCopyOnWrite.java           |  89 ++++++
 .../hudi/command/procedures/HoodieProcedures.scala |   1 +
 .../procedures/RestoreToInstantProcedure.scala     | 309 +++++++++++++++++++++
 .../sql/hudi/procedure/TestRestoreProcedure.scala  | 258 +++++++++++++++++
 5 files changed, 739 insertions(+), 38 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index df06df2fbbba..76ffe51a256d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -92,6 +92,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -846,44 +847,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    */
   public void restoreToSavepoint(String savepointTime) {
     boolean initializeMetadataTableIfNecessary = 
config.isMetadataTableEnabled();
-    if (initializeMetadataTableIfNecessary) {
-      try {
-        // Delete metadata table directly when users trigger savepoint 
rollback if mdt existed and if the savePointTime is beforeTimelineStarts
-        // or before the oldest compaction on MDT.
-        // We cannot restore to before the oldest compaction on MDT as we 
don't have the basefiles before that time.
-        HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
-            .setConf(storageConf.newInstance())
-            
.setBasePath(getMetadataTableBasePath(config.getBasePath())).build();
-        Option<HoodieInstant> oldestMdtCompaction = 
mdtMetaClient.getCommitTimeline().filterCompletedInstants().firstInstant();
-        boolean deleteMDT = false;
-        if (oldestMdtCompaction.isPresent()) {
-          if (LESSER_THAN_OR_EQUALS.test(savepointTime, 
oldestMdtCompaction.get().requestedTime())) {
-            log.warn("Deleting MDT during restore to {} as the savepoint is 
older than oldest compaction {} on MDT",
-                savepointTime, oldestMdtCompaction.get().requestedTime());
-            deleteMDT = true;
-          }
-        }
-
-        // The instant required to sync rollback to MDT has been archived and 
the mdt syncing will be failed
-        // So that we need to delete the whole MDT here.
-        if (!deleteMDT) {
-          HoodieInstant syncedInstant = 
mdtMetaClient.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime);
-          if 
(mdtMetaClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.requestedTime()))
 {
-            log.warn("Deleting MDT during restore to {} as the savepoint is 
older than the MDT timeline {}",
-                savepointTime, 
mdtMetaClient.getCommitsTimeline().firstInstant().get().requestedTime());
-            deleteMDT = true;
-          }
-        }
-
-        if (deleteMDT) {
-          HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), 
context);
-          // rollbackToSavepoint action will try to bootstrap MDT at first but 
sync to MDT will fail at the current scenario.
-          // so that we need to disable metadata initialized here.
-          initializeMetadataTableIfNecessary = false;
-        }
-      } catch (Exception e) {
-        // Metadata directory does not exist
-      }
+    if (initializeMetadataTableIfNecessary && 
shouldDeleteMdtBeforeRestore(savepointTime)) {
+      HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), 
context);
+      // rollbackToSavepoint action will try to bootstrap MDT at first but 
sync to MDT will fail at the current scenario.
+      // so that we need to disable metadata initialized here.
+      initializeMetadataTableIfNecessary = false;
     }
 
     HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, 
Option.empty(), initializeMetadataTableIfNecessary);
@@ -894,6 +862,82 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     SavepointHelpers.validateSavepointRestore(table, savepointTime);
   }
 
+  /**
+   * Decides whether the metadata table (MDT) must be deleted before restoring 
the data table to
+   * {@code targetInstant}. Returns true when restoring would leave the MDT in 
an inconsistent
+   * state, specifically when any of the following holds:
+   * <ol>
+   *   <li>The target is at or before the oldest completed compaction. We 
cannot restore to before
+   *       the oldest compaction because we don't have base files before that 
time.</li>
+   *   <li>The target is before the MDT timeline start (the relevant history 
was archived away).</li>
+   * </ol>
+   * Returns false when the MDT directory does not exist or is not readable 
(nothing to delete or
+   * worry about). Wraps genuine IO failures ({@link IOException}) in a {@link 
HoodieException}
+   * so permission / network errors surface to the caller.
+   */
+  protected boolean shouldDeleteMdtBeforeRestore(String targetInstant) {
+    String mdtBasePath = getMetadataTableBasePath(config.getBasePath());
+    try {
+      // Cheap existence check first to avoid constructing an MDT meta client 
when there is no MDT.
+      if (!storage.exists(new StoragePath(mdtBasePath))) {
+        return false;
+      }
+      HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
+          .setConf(storageConf.newInstance())
+          .setBasePath(mdtBasePath).build();
+      List<HoodieInstant> completedCompactions = 
mdtMetaClient.getCommitTimeline()
+          .filterCompletedInstants().getInstants();
+      Option<HoodieInstant> oldestMdtCompaction = 
completedCompactions.isEmpty()
+          ? Option.empty() : Option.of(completedCompactions.get(0));
+      if (oldestMdtCompaction.isPresent()
+          && LESSER_THAN_OR_EQUALS.test(targetInstant, 
oldestMdtCompaction.get().requestedTime())) {
+        log.warn("Deleting MDT before restore to {}: target is at or before 
oldest MDT compaction {}",
+            targetInstant, oldestMdtCompaction.get().requestedTime());
+        return true;
+      }
+      if 
(mdtMetaClient.getCommitsTimeline().isBeforeTimelineStarts(targetInstant)) {
+        log.warn("Deleting MDT before restore to {}: target is before MDT 
timeline start", targetInstant);
+        return true;
+      }
+      return false;
+    } catch (IOException e) {
+      throw new HoodieException(
+          "Failed to inspect MDT at " + mdtBasePath + " before restore to " + 
targetInstant
+              + " - refusing to silently proceed without an MDT integrity 
check.", e);
+    } catch (HoodieException e) {
+      // MDT directory exists but is not usable (e.g. TableNotFoundException 
from a partially
+      // initialized MDT). Treat as absent: no deletion needed, let the 
restore proceed.
+      log.warn("MDT at {} is present but could not be read ({}); skipping 
pre-check.",
+          mdtBasePath, e.getMessage());
+      return false;
+    }
+  }
+
+  /**
+   * Deletes the metadata table (MDT) if it would be left in an inconsistent 
state by a restore
+   * to {@code targetInstant}, and returns whether the MDT was actually 
deleted.
+   *
+   * <p>Callers that drive restore via {@link #restoreToInstant} directly 
(e.g. the
+   * {@code restore_to_instant} stored procedure) should call this method 
before invoking
+   * {@code restoreToInstant} and suppress MDT initialization when it returns 
{@code true}:
+   *
+   * <pre>{@code
+   * boolean mdtDeleted = 
client.deleteMdtIfNecessaryBeforeRestore(targetInstant);
+   * client.restoreToInstant(targetInstant, !mdtDeleted && enableMetadata);
+   * }</pre>
+   *
+   * @param targetInstant the instant the data table will be restored to
+   * @return {@code true} if the MDT was deleted (caller must not 
re-initialize it);
+   *         {@code false} otherwise (MDT either did not need deletion or does 
not exist)
+   */
+  public boolean deleteMdtIfNecessaryBeforeRestore(String targetInstant) {
+    if (shouldDeleteMdtBeforeRestore(targetInstant)) {
+      HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), 
context);
+      return true;
+    }
+    return false;
+  }
+
   @Deprecated
   public boolean rollback(final String commitInstantTime) throws 
HoodieRollbackException {
     HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, 
Option.empty());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
index 0be971630c34..5c3c83226073 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 
@@ -41,6 +43,7 @@ import java.util.Objects;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -217,4 +220,90 @@ public class TestSavepointRestoreCopyOnWrite extends 
HoodieClientTestBase {
       assertRowNumberEqualsTo(20);
     }
   }
+
+  /**
+   * Two-assertion test for the MDT pre-check guard:
+   * <ol>
+   *   <li>{@code deleteMdtIfNecessaryBeforeRestore(firstCommit)} returns 
{@code true} (and
+   *       deletes the MDT) when the target is at or before the oldest MDT 
compaction,
+   *       confirming the guard logic fires for this table setup.</li>
+   *   <li>{@code restoreToInstant(firstCommit, false)} does NOT invoke the 
pre-check, so a
+   *       caller that explicitly opts out of MDT integration is never 
surprised by an implicit
+   *       MDT deletion.</li>
+   * </ol>
+   */
+  @Test
+  void testRestoreToInstantSkipsMdtCheckWhenMetadataDisabled() throws 
Exception {
+    HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withRollbackUsingMarkers(true)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withMaxNumDeltaCommitsBeforeCompaction(4)
+            .build())
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      String firstCommit = null;
+      String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+      final int numRecords = 10;
+      // 5 inserts so the MDT goes through one compaction with 
maxDeltaCommits=4.
+      for (int i = 1; i <= 5; i++) {
+        String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+        insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, 
numRecords, SparkRDDWriteClient::insert,
+            false, true, numRecords, numRecords * i, 1, Option.empty(), 
INSTANT_GENERATOR);
+        prevInstant = newCommitTime;
+        if (i == 1) {
+          firstCommit = newCommitTime;
+        }
+      }
+      assertRowNumberEqualsTo(50);
+
+      String mdtBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(hoodieWriteConfig.getBasePath());
+      assertTrue(HoodieStorageUtils.getStorage(mdtBasePath, 
storageConf).exists(new StoragePath(mdtBasePath)),
+          "MDT directory should exist before any pre-check");
+
+      // Assertion 1: deleteMdtIfNecessaryBeforeRestore detects that 
firstCommit is at or before
+      // the oldest MDT compaction, deletes the MDT, and returns true.
+      boolean mdtDeleted = client.deleteMdtIfNecessaryBeforeRestore(
+          Objects.requireNonNull(firstCommit, "first commit should not be 
null"));
+      assertTrue(mdtDeleted,
+          "deleteMdtIfNecessaryBeforeRestore should return true when target is 
at or before the oldest MDT compaction");
+      assertFalse(HoodieStorageUtils.getStorage(mdtBasePath, 
storageConf).exists(new StoragePath(mdtBasePath)),
+          "MDT directory should have been deleted by 
deleteMdtIfNecessaryBeforeRestore");
+
+      // Assertion 2: restoreToInstant with 
initialMetadataTableIfNecessary=false does NOT invoke
+      // the pre-check — the MDT (now absent) is not touched. The restore 
proceeds without MDT.
+      client.restoreToInstant(firstCommit, false);
+      assertRowNumberEqualsTo(numRecords);
+    }
+  }
+
+  /**
+   * Regression coverage for the {@code restoreToSavepoint} refactor. After 
replacing the inline
+   * MDT pre-check with a call to the centralized helper, the common case 
(target after the
+   * oldest MDT compaction, no MDT delete needed) must still work end-to-end.
+   */
+  @Test
+  void testRestoreToSavepointStillWorksAfterRefactor() throws Exception {
+    HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withRollbackUsingMarkers(true)
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{
+      String savepointCommit = null;
+      String prevInstant = HoodieTimeline.INIT_INSTANT_TS;
+      final int numRecords = 10;
+      for (int i = 1; i <= 4; i++) {
+        String newCommitTime = WriteClientTestUtils.createNewInstantTime();
+        insertBatch(hoodieWriteConfig, client, newCommitTime, prevInstant, 
numRecords, SparkRDDWriteClient::insert,
+            false, true, numRecords, numRecords * i, 1, Option.empty(), 
INSTANT_GENERATOR);
+        prevInstant = newCommitTime;
+        if (i == 2) {
+          savepointCommit = newCommitTime;
+          client.savepoint("user1", "Savepoint for 2nd commit");
+        }
+      }
+      assertRowNumberEqualsTo(40);
+      client.restoreToSavepoint(Objects.requireNonNull(savepointCommit, 
"restore commit should not be null"));
+      assertRowNumberEqualsTo(20);
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 508e37d47663..89ada91e839c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -39,6 +39,7 @@ object HoodieProcedures {
       ,(DeleteSavepointProcedure.NAME, DeleteSavepointProcedure.builder)
       ,(RollbackToSavepointProcedure.NAME, 
RollbackToSavepointProcedure.builder)
       ,(RollbackToInstantTimeProcedure.NAME, 
RollbackToInstantTimeProcedure.builder)
+      ,(RestoreToInstantProcedure.NAME, RestoreToInstantProcedure.builder)
       ,(RunClusteringProcedure.NAME, RunClusteringProcedure.builder)
       ,(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder)
       ,(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RestoreToInstantProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RestoreToInstantProcedure.scala
new file mode 100644
index 000000000000..ca5a2fc924ae
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RestoreToInstantProcedure.scala
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.HoodieCLIUtils
+import org.apache.hudi.avro.model.HoodieRestoreMetadata
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.fs.ConsistencyGuardConfig
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.storage.StoragePath
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.hudi.command.procedures.RestoreToInstantProcedure._
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Stored procedure to perform a full point-in-time table restore to a given 
instant.
+ *
+ * Unlike [[RollbackToSavepointProcedure]] (which requires a savepoint at the 
target instant),
+ * this procedure calls restoreToInstant() directly and works on any arbitrary 
instant on the
+ * active timeline.
+ *
+ * Parameters:
+ *   - table / path: identifies the Hudi table (one must be provided)
+ *   - instant_time: target commit to restore to (required when 
audit_only=false; must be omitted
+ *                   when audit_only=true)
+ *   - start_restore_time: the restore operation's own timeline timestamp (the 
start_restore_time
+ *                   value returned by a prior restore_to_instant call). 
Required when
+ *                   audit_only=true; must be omitted otherwise.
+ *   - enable_metadata: whether the metadata table is enabled (default: true)
+ *   - rollback_parallelism: Spark parallelism for rollback and audit 
operations (default: 4)
+ *   - enable_consistency_guard: enable consistency guard for file existence 
checks (default: false)
+ *   - audit_post_restore: after restoring, verify that all 
successfully-deleted files are absent (default: false)
+ *   - audit_only: skip the restore and only audit a previously completed 
restore instant (default: false)
+ *
+ * Output columns:
+ *   - restore_result: true if restore succeeded; null if audit_only=true
+ *   - start_restore_time: the restore operation's own timeline timestamp; 
null if audit_only=true.
+ *                         Pass this value as start_restore_time to re-run the 
audit later.
+ *   - time_taken_in_millis: restore duration; null if audit_only=true
+ *   - instants_rolled_back: number of commits rolled back; null if 
audit_only=true
+ *   - audit_result: one of "PASSED" / "FAILED" / "INCONCLUSIVE" when an audit 
ran; null otherwise.
+ *                   INCONCLUSIVE means at least one file existence check 
threw an IOException
+ *                   (e.g. transient cloud-storage timeout) — re-run 
audit_only=true to retry.
+ */
+class RestoreToInstantProcedure extends BaseProcedure with ProcedureBuilder 
with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.optional(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "instant_time", DataTypes.StringType),
+    ProcedureParameter.optional(2, "enable_metadata", DataTypes.BooleanType, 
true),
+    ProcedureParameter.optional(3, "rollback_parallelism", 
DataTypes.IntegerType, 4),
+    ProcedureParameter.optional(4, "enable_consistency_guard", 
DataTypes.BooleanType, false),
+    ProcedureParameter.optional(5, "audit_post_restore", 
DataTypes.BooleanType, false),
+    ProcedureParameter.optional(6, "audit_only", DataTypes.BooleanType, false),
+    ProcedureParameter.optional(7, "path", DataTypes.StringType),
+    ProcedureParameter.optional(8, "start_restore_time", DataTypes.StringType)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("restore_result", DataTypes.BooleanType, nullable = true, 
Metadata.empty),
+    StructField("start_restore_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("time_taken_in_millis", DataTypes.LongType, nullable = true, 
Metadata.empty),
+    StructField("instants_rolled_back", DataTypes.LongType, nullable = true, 
Metadata.empty),
+    StructField("audit_result", DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val instantTime = getArgValueOrDefault(args, PARAMETERS(1))
+    val enableMetadata = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[Boolean]
+    val rollbackParallelism = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Int]
+    val enableConsistencyGuard = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[Boolean]
+    val shouldAuditPostRestore = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[Boolean]
+    val auditOnly = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[Boolean]
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(7))
+    val startRestoreTimeArg = getArgValueOrDefault(args, PARAMETERS(8))
+
+    // Cross-validation: each of (instant_time, start_restore_time) has one 
unambiguous meaning.
+    if (!auditOnly && instantTime.isEmpty) {
+      throw new HoodieException("instant_time is required when 
audit_only=false.")
+    }
+    if (auditOnly && startRestoreTimeArg.isEmpty) {
+      throw new HoodieException(
+        "start_restore_time is required when audit_only=true. " +
+          "Pass the start_restore_time value from a prior restore_to_instant 
call.")
+    }
+    if (!auditOnly && startRestoreTimeArg.isDefined) {
+      throw new HoodieException("start_restore_time may only be specified when 
audit_only=true.")
+    }
+    if (auditOnly && instantTime.isDefined) {
+      throw new HoodieException(
+        "instant_time may only be specified when audit_only=false. " +
+          "Use start_restore_time to identify a previously executed restore.")
+    }
+    if (auditOnly && shouldAuditPostRestore) {
+      logWarning("Both audit_only and audit_post_restore are set. Only 
audit_only will be honored.")
+    }
+
+    val basePath = getBasePath(tableName, tablePath)
+
+    val confs = Map(
+      HoodieMetadataConfig.ENABLE.key() -> enableMetadata.toString,
+      HoodieWriteConfig.ROLLBACK_PARALLELISM_VALUE.key() -> 
rollbackParallelism.toString,
+      HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key() -> "false"
+    ) ++ (if (enableConsistencyGuard) Map(ConsistencyGuardConfig.ENABLE.key() 
-> "true") else Map.empty)
+
+    val metaClient = createMetaClient(jsc, basePath)
+
+    // Nullable boxed types so Row can hold null for audit_only runs
+    var restoreResult: java.lang.Boolean = null
+    var startRestoreTime: String = null
+    var timeTakenInMillis: java.lang.Long = null
+    var instantsRolledBack: java.lang.Long = null
+
+    if (!auditOnly) {
+      val targetInstant = instantTime.get.asInstanceOf[String]
+      var client: SparkRDDWriteClient[_] = null
+      try {
+        client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, 
basePath, confs,
+          tableName.asInstanceOf[Option[String]])
+        // Pre-check: if the target instant is at or before the oldest MDT 
compaction or before the
+        // MDT timeline start, pre-emptively delete the MDT so 
restoreToInstant does not leave it
+        // inconsistent. deleteMdtIfNecessaryBeforeRestore returns true when 
the MDT was deleted
+        // (caller must not re-initialize it), false otherwise.
+        val mdtDeleted = if (enableMetadata) {
+          client.deleteMdtIfNecessaryBeforeRestore(targetInstant)
+        } else false
+        val restoreMetadata = client.restoreToInstant(targetInstant, 
!mdtDeleted && enableMetadata)
+        restoreResult = true
+        startRestoreTime = restoreMetadata.getStartRestoreTime
+        timeTakenInMillis = restoreMetadata.getTimeTakenInMillis
+        instantsRolledBack = 
restoreMetadata.getInstantsToRollback.size().toLong
+      } finally {
+        if (client != null) {
+          client.close()
+        }
+      }
+      if (tableName.isDefined) {
+        spark.catalog.refreshTable(tableName.get.asInstanceOf[String])
+      }
+      // getActiveTimeline is lazily cached; reload so the new .restore 
instant is visible to the audit.
+      if (shouldAuditPostRestore) {
+        metaClient.reloadActiveTimeline()
+      }
+    }
+
+    var auditResult: String = null
+    if (auditOnly || shouldAuditPostRestore) {
+      val restoreInstant: HoodieInstant = if (auditOnly) {
+        val ts = startRestoreTimeArg.get.asInstanceOf[String]
+        val instants = 
metaClient.getActiveTimeline.getRestoreTimeline.filterCompletedInstants
+          .getInstants.asScala
+        instants.find(_.requestedTime().equals(ts)).getOrElse(
+          throw new HoodieException(s"No completed restore instant found for 
$ts. " +
+            "Pass the start_restore_time from a prior restore_to_instant call 
as start_restore_time.")
+        )
+      } else {
+        // Use startRestoreTime captured from the restore metadata — more 
deterministic than
+        // lastInstant() since another concurrent restore could otherwise land 
in between.
+        val ts = startRestoreTime
+        val instants = 
metaClient.getActiveTimeline.getRestoreTimeline.filterCompletedInstants
+          .getInstants.asScala
+        instants.find(_.requestedTime().equals(ts)).getOrElse(
+          throw new HoodieException(s"No completed restore instant found for 
$ts after restore.")
+        )
+      }
+      auditResult = auditPostRestore(metaClient, basePath, restoreInstant, 
rollbackParallelism)
+    }
+
+    Seq(Row(restoreResult, startRestoreTime, timeTakenInMillis, 
instantsRolledBack, auditResult))
+  }
+
+  /**
+   * Verifies that all files expected to have been deleted by a restore 
operation are actually
+   * absent from storage. Returns one of "PASSED", "FAILED", or "INCONCLUSIVE":
+   *   - PASSED: every file expected to be absent is in fact absent.
+   *   - FAILED: at least one file is still present after restore.
+   *   - INCONCLUSIVE: no file was confirmed present, but at least one 
existence check threw
+   *                   an IOException (e.g. transient cloud-storage timeout). 
Re-run with
+   *                   audit_only=true to retry.
+   */
+  private def auditPostRestore(
+      metaClient: HoodieTableMetaClient,
+      basePath: String,
+      restoreInstant: HoodieInstant,
+      rollbackParallelism: Int): String = {
+    try {
+      val restoreMetadata: HoodieRestoreMetadata =
+        metaClient.getActiveTimeline.readRestoreMetadata(restoreInstant)
+
+      val filesToCheck = new java.util.ArrayList[String]()
+      restoreMetadata.getHoodieRestoreMetadata.values.asScala.foreach { 
rollbackList =>
+        rollbackList.asScala.foreach { rollback =>
+          rollback.getPartitionMetadata.asScala.foreach { case (partition, pm) 
=>
+            val partitionPathStr = basePath + Path.SEPARATOR + partition
+            val partitionStoragePath = new StoragePath(partitionPathStr)
+            if (!metaClient.getStorage.exists(partitionStoragePath)) {
+              logInfo(s"Partition path $partitionStoragePath does not exist. 
Skipping audit for its files.")
+            } else {
+              // Use .toString() to preserve the full absolute path. Using 
.getName() would strip
+              // the path to just the filename, making the subsequent 
FS.exists() check vacuously pass.
+              // Only check getSuccessDeleteFiles: these are the files the 
restore intended to delete
+              // and whose absence we can meaningfully verify. Files in 
getFailedDeleteFiles already
+              // failed to be deleted and are expected to still be present.
+              pm.getSuccessDeleteFiles.asScala.foreach(f =>
+                filesToCheck.add(new Path(partitionPathStr, f).toString))
+            }
+          }
+        }
+      }
+
+      if (filesToCheck.isEmpty) {
+        logInfo(s"No files to audit for restore instant 
${restoreInstant.requestedTime()}")
+        "PASSED"
+      } else {
+        // HadoopFSUtils.getStorageConfWithCopy returns a Serializable 
StorageConfiguration; the
+        // closure captures only storageConf, so Spark's ClosureCleaner can 
serialize it cleanly.
+        // HadoopFSUtils.getFs internally calls prepareHadoopConf, preserving 
HOODIE_ENV_* (e.g.
+        // S3A) injection at cloud DCs.
+        val storageConf = 
HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration())
+        val outcomes = jsc.parallelize(filesToCheck, Math.max(1, 
rollbackParallelism))
+          .map { pathStr =>
+            val p = new Path(pathStr)
+            try {
+              val exists = HadoopFSUtils.getFs(p, 
storageConf.unwrap()).exists(p)
+              val status: AuditFileStatus = if (exists) Present else Absent
+              (pathStr, status)
+            } catch {
+              case e: Exception => (pathStr, IndeterminateError(e.getMessage))
+            }
+          }
+          .collect()
+          .asScala
+          .toArray
+
+        val present = outcomes.collect { case (p, Present) => p }
+        val indeterminate = outcomes.collect { case (p, 
IndeterminateError(msg)) => (p, msg) }
+        if (present.nonEmpty) {
+          logError(s"Restore audit FAILED for instant 
${restoreInstant.requestedTime()}: " +
+            s"${present.length} file(s) still present, e.g. 
${present.take(5).mkString(", ")}")
+          "FAILED"
+        } else if (indeterminate.nonEmpty) {
+          logWarning(s"Restore audit INCONCLUSIVE for instant 
${restoreInstant.requestedTime()}: " +
+            s"${indeterminate.length} file(s) had IO errors, e.g. 
${indeterminate.take(5).mkString(", ")}")
+          "INCONCLUSIVE"
+        } else {
+          logInfo(s"Restore audit PASSED for instant 
${restoreInstant.requestedTime()}: " +
+            s"${outcomes.length} file(s) confirmed absent.")
+          "PASSED"
+        }
+      }
+    } catch {
+      case e: Exception =>
+        logError(s"Exception during restore audit for instant 
${restoreInstant.requestedTime()}", e)
+        "INCONCLUSIVE"
+    }
+  }
+
+  override def build: Procedure = new RestoreToInstantProcedure()
+}
+
+object RestoreToInstantProcedure {
+  val NAME: String = "restore_to_instant"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get(): RestoreToInstantProcedure = new 
RestoreToInstantProcedure()
+  }
+
+  // ADT for per-file audit outcomes. Defined at object level (NOT inside 
auditPostRestore) so the
+  // generated bytecode does not embed a synthetic outer-class reference; 
otherwise Spark's
+  // ClosureCleaner cannot serialize the closure that returns these values 
from executors.
+  private sealed trait AuditFileStatus extends Serializable
+  private case object Absent extends AuditFileStatus
+  private case object Present extends AuditFileStatus
+  private case class IndeterminateError(message: String) extends 
AuditFileStatus
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRestoreProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRestoreProcedure.scala
new file mode 100644
index 000000000000..7674fa6abaa8
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRestoreProcedure.scala
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+class TestRestoreProcedure extends HoodieSparkProcedureTestBase {
+
+  private def createTableAndInsertData(tableName: String, tablePath: String, 
tableType: String = "cow"): Array[String] = {
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  ts long
+         |) using hudi
+         | location '$tablePath'
+         | tblproperties (
+         |  primaryKey = 'id',
+         |  preCombineField = 'ts',
+         |  type = '$tableType'
+         | )
+       """.stripMargin)
+    spark.sql(s"insert into $tableName select 1, 'a1', 10.0, 1000")
+    spark.sql(s"insert into $tableName select 2, 'a2', 20.0, 1500")
+    spark.sql(s"insert into $tableName select 3, 'a3', 30.0, 2000")
+    spark.sql(s"insert into $tableName select 4, 'a4', 40.0, 2500")
+    spark.sql(s"call show_commits(table => '$tableName')").collect()
+      .map(_.getString(0)).sorted
+  }
+
+  test("Test restore_to_instant basic CoW") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
+      val commits = createTableAndInsertData(tableName, tablePath)
+      assertResult(4)(commits.length)
+
+      // restore to after the 2nd commit — only commits(0) and commits(1) 
should remain
+      val result = spark.sql(
+        s"call restore_to_instant(table => '$tableName', instant_time => 
'${commits(1)}')"
+      ).collect()
+
+      assertResult(1)(result.length)
+      assertResult(true)(result(0).getBoolean(0))     // restore_result
+      assert(result(0).getString(1) != null)          // start_restore_time 
(dynamic timestamp)
+      assert(result(0).getLong(2) >= 0)               // time_taken_in_millis
+      assert(result(0).getLong(3) >= 0L)              // instants_rolled_back
+      assertResult(true)(result(0).isNullAt(4))       // audit_result = null 
(audit not requested)
+
+      // verify data reverted to 2 records
+      val count = spark.sql(s"select count(*) from 
$tableName").collect()(0).getLong(0)
+      assertResult(2)(count)
+    }
+  }
+
+  test("Test restore_to_instant basic MoR") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
+      val commits = createTableAndInsertData(tableName, tablePath, tableType = 
"mor")
+      assertResult(4)(commits.length)
+
+      val result = spark.sql(
+        s"call restore_to_instant(table => '$tableName', instant_time => 
'${commits(1)}')"
+      ).collect()
+
+      assertResult(1)(result.length)
+      assertResult(true)(result(0).getBoolean(0))
+      assert(result(0).getString(1) != null)
+      assert(result(0).getLong(2) >= 0)
+      assert(result(0).getLong(3) >= 0L)
+      assertResult(true)(result(0).isNullAt(4))
+
+      val count = spark.sql(s"select count(*) from 
$tableName").collect()(0).getLong(0)
+      assertResult(2)(count)
+    }
+  }
+
+  test("Test restore_to_instant using path parameter") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
+      val commits = createTableAndInsertData(tableName, tablePath)
+      assertResult(4)(commits.length)
+
+      val result = spark.sql(
+        s"call restore_to_instant(path => '$tablePath', instant_time => 
'${commits(1)}')"
+      ).collect()
+
+      assertResult(1)(result.length)
+      assertResult(true)(result(0).getBoolean(0))
+      assert(result(0).isNullAt(4))
+
+      val count = spark.sql(s"select count(*) from 
$tableName").collect()(0).getLong(0)
+      assertResult(2)(count)
+    }
+  }
+
+  test("Test restore_to_instant with audit_post_restore") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
+      val commits = createTableAndInsertData(tableName, tablePath)
+      assertResult(4)(commits.length)
+
+      val result = spark.sql(
+        s"""call restore_to_instant(
+           |  table => '$tableName',
+           |  instant_time => '${commits(1)}',
+           |  audit_post_restore => true
+           |)""".stripMargin
+      ).collect()
+
+      assertResult(1)(result.length)
+      assertResult(true)(result(0).getBoolean(0))     // restore_result
+      assert(result(0).getString(1) != null)          // start_restore_time
+      // audit_result should be "PASSED": all rolled-back files are absent
+      assertResult(false)(result(0).isNullAt(4))
+      assertResult("PASSED")(result(0).getString(4))
+
+      val count = spark.sql(s"select count(*) from 
$tableName").collect()(0).getLong(0)
+      assertResult(2)(count)
+    }
+  }
+
+  test("Test restore_to_instant audit_only mode") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
+      val commits = createTableAndInsertData(tableName, tablePath)
+      assertResult(4)(commits.length)
+
+      // Round 1: perform the restore and capture the restore operation's own 
timeline timestamp
+      val restoreRows = spark.sql(
+        s"call restore_to_instant(table => '$tableName', instant_time => 
'${commits(1)}')"
+      ).collect()
+      assertResult(true)(restoreRows(0).getBoolean(0))
+      // start_restore_time is the restore instant's timeline timestamp — NOT 
commits(1)
+      val restoreInstantTs = restoreRows(0).getString(1)
+      assert(restoreInstantTs != null)
+      assert(restoreInstantTs != commits(1))
+
+      // Round 2: audit_only using start_restore_time (the original target 
commit is not needed)
+      val auditRows = spark.sql(
+        s"""call restore_to_instant(
+           |  table => '$tableName',
+           |  audit_only => true,
+           |  start_restore_time => '$restoreInstantTs'
+           |)""".stripMargin
+      ).collect()
+
+      assertResult(1)(auditRows.length)
+      assertResult(true)(auditRows(0).isNullAt(0))    // restore_result = null 
(no restore performed)
+      assertResult(true)(auditRows(0).isNullAt(1))    // start_restore_time = 
null
+      assertResult(true)(auditRows(0).isNullAt(2))    // time_taken_in_millis 
= null
+      assertResult(true)(auditRows(0).isNullAt(3))    // instants_rolled_back 
= null
+      assertResult(false)(auditRows(0).isNullAt(4))   // audit_result is 
present
+      assertResult("PASSED")(auditRows(0).getString(4))
+    }
+  }
+
+  test("Test restore_to_instant audit_only with non-existent restore instant 
throws") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
+      createTableAndInsertData(tableName, tablePath)
+
+      // start_restore_time pointing at a timestamp that has never been a 
restore instant.
+      assertThrows[Exception] {
+        spark.sql(
+          s"""call restore_to_instant(
+             |  table => '$tableName',
+             |  audit_only => true,
+             |  start_restore_time => '19700101000000000'
+             |)""".stripMargin
+        ).collect()
+      }
+    }
+  }
+
+  test("Test restore_to_instant cross-validation: audit_only=true requires 
start_restore_time") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
+      createTableAndInsertData(tableName, tablePath)
+
+      assertThrows[Exception] {
+        spark.sql(
+          s"call restore_to_instant(table => '$tableName', audit_only => true)"
+        ).collect()
+      }
+    }
+  }
+
+  test("Test restore_to_instant cross-validation: audit_only=false rejects 
start_restore_time") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
+      val commits = createTableAndInsertData(tableName, tablePath)
+
+      assertThrows[Exception] {
+        spark.sql(
+          s"""call restore_to_instant(
+             |  table => '$tableName',
+             |  instant_time => '${commits(1)}',
+             |  start_restore_time => '20990101000000000'
+             |)""".stripMargin
+        ).collect()
+      }
+    }
+  }
+
+  test("Test restore_to_instant cross-validation: audit_only=true rejects 
instant_time") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
+      val commits = createTableAndInsertData(tableName, tablePath)
+
+      assertThrows[Exception] {
+        spark.sql(
+          s"""call restore_to_instant(
+             |  table => '$tableName',
+             |  audit_only => true,
+             |  instant_time => '${commits(1)}',
+             |  start_restore_time => '20990101000000000'
+             |)""".stripMargin
+        ).collect()
+      }
+    }
+  }
+
+  test("Test restore_to_instant cross-validation: audit_only=false requires 
instant_time") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath + "/" + tableName
+      createTableAndInsertData(tableName, tablePath)
+
+      assertThrows[Exception] {
+        spark.sql(s"call restore_to_instant(table => '$tableName')").collect()
+      }
+    }
+  }
+}


Reply via email to