This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d94b2e2c3162 feat(spark): add show_inflight_commits and 
cleanup_stale_inflight_com… (#18709)
d94b2e2c3162 is described below

commit d94b2e2c3162fefd26a7c86301228f6c2c74ceef
Author: Mahsood Ebrahim <[email protected]>
AuthorDate: Mon May 18 23:44:27 2026 -0700

    feat(spark): add show_inflight_commits and cleanup_stale_inflight_com… 
(#18709)
    
    * feat(spark): add show_inflight_commits and cleanup_stale_inflight_commits 
stored procedures
    
    Two new CALL procedures so operators can inspect and remediate stale
    inflight commits via SQL instead of using hudi-cli.
    
    show_inflight_commits(table, min_age_minutes?) lists REQUESTED+INFLIGHT
    instants from the active timeline.
    
    cleanup_stale_inflight_commits(table, allowed_inflight_interval_minutes?,
    include_ingestion_commits?, dry_run?) rolls back stale write-timeline
    inflights older than the threshold (default 180 min). COMPACTION,
    LOG_COMPACTION, and CLUSTERING route to their dedicated
    table.rollbackInflight* methods (HoodieSparkTable is lazy-init on first
    such instant); other write actions go through client.rollback().
    include_ingestion_commits and dry_run both default to false; dry_run
    emits rollback_status=NULL and skips write-client construction.
    
    A single-method utility HoodieTimelineCleanupUtil
    (inflightWriteCommitsOlderThan) is added in hudi-spark-common.
    
    Tested with 4 show + 9 cleanup unit tests covering empty/threshold/
    ingestion-gating/dry_run paths plus COMPACTION, CLUSTERING, partitioned
    COW, and MOR delta_commit. checkstyle + scalastyle clean.
    
    * address review comments on show_inflight_commits / 
cleanup_stale_inflight_commits
    
    - Rename HoodieTimelineCleanupUtil.inflightWriteCommitsOlderThan param
      `mins` -> `ageMinutes` for clarity.
    - Replace Duration.ofMinutes(...).getSeconds() * 1000 with .toMillis()
      in HoodieTimelineCleanupUtil and ShowInflightCommitsProcedure.
    - Add inflight-state recheck in CleanupStaleInflightCommitsProcedure
      before client.rollback(): reloads the active timeline and skips the
      rollback if the instant is no longer INFLIGHT/REQUESTED, preventing
      destructive rollback of a commit that completed concurrently after
      detection.
    
    ---------
    
    Co-authored-by: mahsoode <[email protected]>
---
 .../org/apache/hudi/HoodieTimelineCleanupUtil.java |  59 +++
 .../CleanupStaleInflightCommitsProcedure.scala     | 224 +++++++++++
 .../hudi/command/procedures/HoodieProcedures.scala |   2 +
 .../procedures/ShowInflightCommitsProcedure.scala  | 113 ++++++
 .../TestCleanupStaleInflightCommitsProcedure.scala | 413 +++++++++++++++++++++
 .../TestShowInflightCommitsProcedure.scala         | 151 ++++++++
 6 files changed, 962 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieTimelineCleanupUtil.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieTimelineCleanupUtil.java
new file mode 100644
index 000000000000..f8583d5d5d2a
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieTimelineCleanupUtil.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class HoodieTimelineCleanupUtil {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieTimelineCleanupUtil.class);
+
+  public static List<HoodieInstant> 
inflightWriteCommitsOlderThan(HoodieTableMetaClient metaClient, long 
ageMinutes, boolean includeIngestionCommits) {
+    long goBackMs = Duration.ofMinutes(ageMinutes).toMillis();
+    String oldestAllowedTimestamp = HoodieInstantTimeGenerator.formatDate(new 
Date(System.currentTimeMillis() - goBackMs));
+
+    Stream<HoodieInstant> inflightInstants = metaClient
+        .reloadActiveTimeline()
+        .getWriteTimeline()
+        .filterInflightsAndRequested()
+        .findInstantsBefore(oldestAllowedTimestamp)
+        .getInstants().stream();
+
+    if (!includeIngestionCommits) {
+      Predicate<HoodieInstant> ingestionCommitsFilter =
+          (x) -> x.getAction().equals(HoodieTimeline.COMMIT_ACTION) || 
x.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION);
+      inflightInstants = 
inflightInstants.filter(ingestionCommitsFilter.negate());
+    }
+    List<HoodieInstant> inflightInstantsList = 
inflightInstants.collect(Collectors.toList());
+    LOG.info("Inflight commits older than {} minutes: {}", ageMinutes, 
inflightInstantsList);
+    return inflightInstantsList;
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CleanupStaleInflightCommitsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CleanupStaleInflightCommitsProcedure.scala
new file mode 100644
index 000000000000..66e31302e45b
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CleanupStaleInflightCommitsProcedure.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.{HoodieCLIUtils, HoodieTimelineCleanupUtil}
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.common.HoodiePendingRollbackInfo
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.table.HoodieSparkTable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL stored procedure to roll back stale inflight write commits older 
than a configurable
+ * age threshold.
+ *
+ * SAFETY WARNING: Unlike rollback_to_instant_time which targets a single 
named instant, this
+ * procedure rolls back a SET of instants matched by age. A misconfigured 
threshold can erase
+ * recent in-progress writes when include_ingestion_commits=true. Operators 
should call
+ * show_inflight_commits first to preview the pending instants, and use 
dry_run => true (see
+ * below) when in doubt about which instants will be processed.
+ *
+ * This procedure targets the write timeline (COMMIT, DELTA_COMMIT, 
COMPACTION, LOG_COMPACTION,
+ * REPLACE_COMMIT, CLUSTERING actions). Stale rollback, clean, or restore 
inflights visible in
+ * show_inflight_commits are NOT covered by this procedure.
+ *
+ * Compaction, log-compaction, and clustering inflights are handled via the 
targeted table
+ * methods (table.rollbackInflightCompaction / rollbackInflightLogCompaction /
+ * rollbackInflightClustering) — the supporting state (HoodieSparkTable, 
table-service client,
+ * pending-rollback lookup) is constructed lazily on the first such instant 
and reused.
+ *
+ * Parameters:
+ *   - table:                             Required. Catalog name of the Hudi 
table.
+ *   - allowed_inflight_interval_minutes: Optional (default 180). Instants 
older than this many
+ *                                        minutes are considered stale and 
eligible for rollback.
+ *   - include_ingestion_commits:         Optional (default false). DANGEROUS 
when true: enabling
+ *                                        this allows the procedure to roll 
back COMMIT_ACTION and
+ *                                        DELTA_COMMIT_ACTION inflights, which 
means it can drop
+ *                                        in-progress ingestion data. The 
default false is the
+ *                                        safe choice; true is for operators 
recovering from a
+ *                                        known-stuck ingestion job.
+ *   - dry_run:                           Optional (default false). When true, 
the matched-instant
+ *                                        set is resolved exactly as in normal 
mode but no rollback
+ *                                        calls are issued. Each returned row 
carries
+ *                                        rollback_status = NULL meaning 
"matched but not acted
+ *                                        upon". Re-run with dry_run => false 
to act.
+ *
+ * Output columns (one row per processed instant):
+ *   - instant_time:    The instant's requested timestamp.
+ *   - action:          The action type.
+ *   - rollback_status: true if the rollback succeeded, false if the rollback 
failed,
+ *                      NULL if dry_run was true (matched but not actioned).
+ *
+ * Example usage:
+ * {{{
+ *   -- Clean stale table-service inflights (default 180-min threshold)
+ *   CALL cleanup_stale_inflight_commits(table => 'my_table');
+ *
+ *   -- Preview what would be processed without acting
+ *   CALL cleanup_stale_inflight_commits(table => 'my_table', dry_run => true);
+ *
+ *   -- Clean stale inflights older than 1 hour, including ingestion commits 
(DANGEROUS)
+ *   CALL cleanup_stale_inflight_commits(
+ *     table => 'my_table',
+ *     allowed_inflight_interval_minutes => 60,
+ *     include_ingestion_commits => true
+ *   );
+ * }}}
+ *
+ * For inflight commit types not covered by this procedure (clean, restore, 
rollback inflights),
+ * use hudi-cli's `repair rollback` command.
+ */
+class CleanupStaleInflightCommitsProcedure extends BaseProcedure with 
ProcedureBuilder with Logging {
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "allowed_inflight_interval_minutes", 
DataTypes.IntegerType, 180),
+    ProcedureParameter.optional(2, "include_ingestion_commits", 
DataTypes.BooleanType, false),
+    ProcedureParameter.optional(3, "dry_run", DataTypes.BooleanType, false)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant_time",    DataTypes.StringType,  nullable = true, 
Metadata.empty),
+    StructField("action",          DataTypes.StringType,  nullable = true, 
Metadata.empty),
+    StructField("rollback_status", DataTypes.BooleanType, nullable = true, 
Metadata.empty)
+  ))
+
+  override def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  override def outputType: StructType = OUTPUT_TYPE
+
+  override def build: Procedure = new CleanupStaleInflightCommitsProcedure()
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName               = getArgValueOrDefault(args, PARAMETERS(0))
+    val allowedMinutes          = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[Int]
+    val includeIngestionCommits = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[Boolean]
+    val dryRun                  = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Boolean]
+
+    val basePath = getBasePath(tableName)
+    val metaClient = HoodieTableMetaClient.builder
+      .setConf(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration))
+      .setBasePath(basePath)
+      .build
+
+    val staleInflights = HoodieTimelineCleanupUtil
+      .inflightWriteCommitsOlderThan(metaClient, allowedMinutes.toLong, 
includeIngestionCommits)
+
+    if (staleInflights.isEmpty) {
+      Seq.empty[Row]
+    } else if (dryRun) {
+      // Dry-run: do not open the table for write — emit preview rows with 
NULL rollback_status
+      // to mean "matched but not actioned". Re-run with dry_run => false to 
act.
+      staleInflights.asScala.map { instant =>
+        Row(instant.requestedTime, instant.getAction, null)
+      }.toSeq
+    } else {
+      // Pass ROLLBACK_USING_MARKERS_ENABLE=false via the 
createHoodieWriteClient confs Map.
+      // Inflight commits may not have marker files, so timeline-based 
rollback is required.
+      // The user-specified confs win over defaults / table config / session 
conf — see
+      // HoodieCLIUtils.scala "Priority: defaults < catalog props < table 
config < sparkSession conf < specified conf".
+      val confs = Map(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key() -> 
"false")
+      var client: SparkRDDWriteClient[_] = null
+      try {
+        client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, 
basePath, confs,
+          tableName.asInstanceOf[scala.Option[String]])
+
+        // Lazy state for compaction / log-compaction / clustering branches.
+        // For matched sets that contain no such instants, none of these are 
constructed.
+        lazy val tsClient = client.getTableServiceClient
+        lazy val table = HoodieSparkTable.create(client.getConfig, 
client.getEngineContext)
+        lazy val getPendingRollbackInstantFunc: 
java.util.function.Function[String, HOption[HoodiePendingRollbackInfo]] =
+          new java.util.function.Function[String, 
HOption[HoodiePendingRollbackInfo]] {
+            override def apply(commitToRollback: String): 
HOption[HoodiePendingRollbackInfo] = {
+              tsClient.getPendingRollbackInfo(table.getMetaClient, 
commitToRollback, false)
+            }
+          }
+
+        val rows = staleInflights.asScala.map { instant =>
+          val status: java.lang.Boolean = try {
+            val result: Boolean = instant.getAction match {
+              case HoodieTimeline.COMPACTION_ACTION =>
+                table.rollbackInflightCompaction(instant, 
getPendingRollbackInstantFunc, client.getTransactionManager)
+                true
+              case HoodieTimeline.LOG_COMPACTION_ACTION =>
+                table.rollbackInflightLogCompaction(instant, 
getPendingRollbackInstantFunc, client.getTransactionManager)
+                true
+              case HoodieTimeline.CLUSTERING_ACTION =>
+                table.rollbackInflightClustering(instant, 
getPendingRollbackInstantFunc, client.getTransactionManager)
+                true
+              case _ =>
+                // Recheck that the instant is still inflight before calling 
client.rollback(),
+                // which searches getCommitsTimeline() (completed + pending). 
Without this guard,
+                // a concurrent writer that completes the commit after 
detection could have its
+                // now-completed commit rolled back destructively. The 
table.rollbackInflight*
+                // branches above fail loudly via 
revertInstantFromInflightToRequested if the
+                // inflight is gone; this branch performs an equivalent safety 
check.
+                val stillInflight = metaClient.reloadActiveTimeline()
+                  .filterInflightsAndRequested()
+                  .containsInstant(instant.requestedTime)
+                if (!stillInflight) {
+                  logWarning(s"Instant ${instant.requestedTime} is no longer 
inflight; " +
+                    "skipping rollback to avoid rolling back a completed 
commit")
+                  false
+                } else {
+                  client.rollback(instant.requestedTime)
+                }
+            }
+            java.lang.Boolean.valueOf(result)
+          } catch {
+            case e: Exception =>
+              logError(s"Failed to rollback inflight instant 
${instant.requestedTime}", e)
+              java.lang.Boolean.FALSE
+          }
+          Row(instant.requestedTime, instant.getAction, status)
+        }.toSeq
+
+        // Refresh catalog after all rollbacks, inside try — consistent with 
RollbackToInstantTimeProcedure.
+        // Not placed in finally to avoid refreshing on client-creation 
failures.
+        if (tableName.isDefined) {
+          spark.catalog.refreshTable(tableName.get.asInstanceOf[String])
+        }
+        rows
+      } finally {
+        if (client != null) client.close()
+      }
+    }
+  }
+}
+
+object CleanupStaleInflightCommitsProcedure {
+  val NAME = "cleanup_stale_inflight_commits"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new CleanupStaleInflightCommitsProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 89ada91e839c..f6657eee3fc6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -94,6 +94,8 @@ object HoodieProcedures {
       ,(ShowTablePropertiesProcedure.NAME, 
ShowTablePropertiesProcedure.builder)
       ,(HelpProcedure.NAME, HelpProcedure.builder)
       ,(ArchiveCommitsProcedure.NAME, ArchiveCommitsProcedure.builder)
+      ,(ShowInflightCommitsProcedure.NAME, 
ShowInflightCommitsProcedure.builder)
+      ,(CleanupStaleInflightCommitsProcedure.NAME, 
CleanupStaleInflightCommitsProcedure.builder)
       ,(RunTimelineCompactionProcedure.NAME, 
RunTimelineCompactionProcedure.builder)
       ,(RunTTLProcedure.NAME, RunTTLProcedure.builder)
       ,(DropPartitionProcedure.NAME, DropPartitionProcedure.builder)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInflightCommitsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInflightCommitsProcedure.scala
new file mode 100644
index 000000000000..3b501bdb25a1
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInflightCommitsProcedure.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.time.Duration
+import java.util.Date
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL stored procedure to list all pending inflight and requested 
instants on a Hudi table.
+ *
+ * Unlike cleanup_stale_inflight_commits (which targets the write timeline 
only), this procedure
+ * queries the full active timeline and therefore includes inflight rollback, 
clean, restore, and
+ * indexing instants in addition to write-action instants. A stale rollback or 
clean inflight
+ * visible here cannot be cleaned by cleanup_stale_inflight_commits.
+ *
+ * Parameters:
+ *   - table:           Required. Catalog name of the Hudi table.
+ *   - min_age_minutes: Optional (default 0). When > 0, only instants older 
than this many minutes
+ *                      are returned. When 0 (default), all pending instants 
are returned.
+ *
+ * Output columns (one row per pending instant):
+ *   - instant_time: The instant's requested timestamp.
+ *   - action:       The action type (commit, delta_commit, compaction, 
replace, rollback, clean, etc.).
+ *   - state:        The instant's state (REQUESTED or INFLIGHT).
+ *
+ * Example usage:
+ * {{{
+ *   -- Show all pending inflights
+ *   CALL show_inflight_commits(table => 'my_table');
+ *
+ *   -- Show only inflights older than 2 hours
+ *   CALL show_inflight_commits(table => 'my_table', min_age_minutes => 120);
+ * }}}
+ */
+class ShowInflightCommitsProcedure extends BaseProcedure with ProcedureBuilder 
{
+
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType),
+    ProcedureParameter.optional(1, "min_age_minutes", DataTypes.IntegerType, 0)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("instant_time", DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("action",       DataTypes.StringType, nullable = true, 
Metadata.empty),
+    StructField("state",        DataTypes.StringType, nullable = true, 
Metadata.empty)
+  ))
+
+  override def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  override def outputType: StructType = OUTPUT_TYPE
+
+  override def build: Procedure = new ShowInflightCommitsProcedure()
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName     = getArgValueOrDefault(args, PARAMETERS(0))
+    val minAgeMinutes = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[Int]
+
+    val basePath = getBasePath(tableName)
+    val metaClient = HoodieTableMetaClient.builder
+      .setConf(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration))
+      .setBasePath(basePath)
+      .build
+
+    val baseTimeline = 
metaClient.reloadActiveTimeline().filterInflightsAndRequested()
+
+    val timeline = if (minAgeMinutes > 0) {
+      val goBackMs = Duration.ofMinutes(minAgeMinutes).toMillis
+      val cutoff = HoodieInstantTimeGenerator.formatDate(new 
Date(System.currentTimeMillis() - goBackMs))
+      baseTimeline.findInstantsBefore(cutoff)
+    } else {
+      baseTimeline
+    }
+
+    timeline.getInstants.asScala.map { instant =>
+      Row(instant.requestedTime, instant.getAction, instant.getState.name())
+    }.toSeq
+  }
+}
+
+object ShowInflightCommitsProcedure {
+  val NAME = "show_inflight_commits"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new ShowInflightCommitsProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanupStaleInflightCommitsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanupStaleInflightCommitsProcedure.scala
new file mode 100644
index 000000000000..f18a173e29ad
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanupStaleInflightCommitsProcedure.scala
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import scala.collection.JavaConverters._
+
+class TestCleanupStaleInflightCommitsProcedure extends 
HoodieSparkProcedureTestBase {
+
+  /**
+   * Creates a table DDL without inserting data. Tests that manipulate 
inflight instants must NOT
+   * insert data before injecting the inflight, because 
BaseRollbackActionExecutor
+   * .validateRollbackCommitSequence throws HoodieRollbackException when 
committed instants exist
+   * after the injected (old) timestamp and no heartbeat exists for the 
injected instant.
+   * With no prior inserts, commitTimeline.empty() = true and the guard is 
bypassed.
+   */
+  private def createEmptyTable(tableName: String, tablePath: String): Unit = {
+    spark.sql(
+      s"""
+         |create table $tableName (
+         | id int,
+         | name string,
+         | ts long
+         | ) using hudi
+         | location '$tablePath'
+         | tblproperties (
+         |   primaryKey = 'id',
+         |   type = 'cow',
+         |   preCombineField = 'ts',
+         |   hoodie.metadata.enable = "false"
+         | )
+         |""".stripMargin)
+  }
+
+  private def createEmptyPartitionedTable(tableName: String, tablePath: 
String, tableType: String): Unit = {
+    spark.sql(
+      s"""
+         |create table $tableName (
+         | id int,
+         | name string,
+         | ts long,
+         | part int
+         | ) using hudi
+         | partitioned by (part)
+         | location '$tablePath'
+         | tblproperties (
+         |   primaryKey = 'id',
+         |   type = '$tableType',
+         |   preCombineField = 'ts',
+         |   hoodie.metadata.enable = "false"
+         | )
+         |""".stripMargin)
+  }
+
+  test("Test cleanup_stale_inflight_commits returns empty when no stale 
inflights exist") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      createEmptyTable(tableName, tmp.getCanonicalPath)
+      spark.sql(s"insert into $tableName values(1, 'a1', 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 2000)")
+
+      val result = spark.sql(s"call cleanup_stale_inflight_commits(table => 
'$tableName')").collect()
+      assertResult(0)(result.length)
+    }
+  }
+
+  test("Test cleanup_stale_inflight_commits rolls back stale 
REPLACE_COMMIT_ACTION inflight") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath
+      // No inserts before injecting — see createEmptyTable docstring
+      createEmptyTable(tableName, tablePath)
+
+      val staleTs = "20200101120000"
+      // Must use REPLACE_COMMIT_ACTION: inflightWriteCommitsOlderThan with
+      // include_ingestion_commits=false (default) filters out COMMIT_ACTION 
and DELTA_COMMIT_ACTION.
+      // REPLACE_COMMIT_ACTION is included in both getWriteTimeline() and 
getCommitsTimeline(),
+      // so client.rollback() finds it and returns true.
+      injectInflightInstant(tablePath, HoodieTimeline.REPLACE_COMMIT_ACTION, 
staleTs)
+
+      val result = spark.sql(
+        s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+          s"allowed_inflight_interval_minutes => 1)").collect()
+
+      assertResult(1)(result.length)
+      assertResult(staleTs)(result(0).getString(0))
+      
assertResult(HoodieTimeline.REPLACE_COMMIT_ACTION)(result(0).getString(1))
+      assertResult(true)(result(0).getBoolean(2))
+
+      // Verify the instant is gone from the active timeline
+      val metaClient = HoodieTableMetaClient.builder
+        
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+        .setBasePath(tablePath)
+        .build
+      val remaining = 
metaClient.reloadActiveTimeline().filterInflightsAndRequested()
+        .getInstants.asScala
+      assert(!remaining.exists(_.requestedTime == staleTs),
+        s"Stale instant $staleTs should have been removed from the timeline 
after rollback")
+    }
+  }
+
+  test("Test cleanup_stale_inflight_commits respects 
allowed_inflight_interval_minutes threshold") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath
+      // No inserts before injecting — see createEmptyTable docstring
+      createEmptyTable(tableName, tablePath)
+
+      val staleTs = "20200101120000"
+      val freshTs = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())
+
+      injectInflightInstant(tablePath, HoodieTimeline.REPLACE_COMMIT_ACTION, 
staleTs)
+      injectInflightInstant(tablePath, HoodieTimeline.REPLACE_COMMIT_ACTION, 
freshTs)
+
+      // 60-minute threshold: only the stale instant qualifies; fresh instant 
is too recent
+      val result = spark.sql(
+        s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+          s"allowed_inflight_interval_minutes => 60)").collect()
+
+      assertResult(1)(result.length)
+      assertResult(staleTs)(result(0).getString(0))
+    }
+  }
+
+  test("Test cleanup_stale_inflight_commits cleans COMMIT_ACTION with 
include_ingestion_commits=true") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath
+      // No inserts before injecting — see createEmptyTable docstring
+      createEmptyTable(tableName, tablePath)
+
+      val staleTs = "20200101120000"
+      // COMMIT_ACTION is filtered out with the default 
include_ingestion_commits=false,
+      // but included when include_ingestion_commits=true.
+      // client.rollback returns true for COMMIT_ACTION since 
getCommitsTimeline() includes it.
+      injectInflightInstant(tablePath, HoodieTimeline.COMMIT_ACTION, staleTs)
+
+      // Default (include_ingestion_commits=false): should not find 
COMMIT_ACTION inflight
+      val defaultResult = spark.sql(
+        s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+          s"allowed_inflight_interval_minutes => 1)").collect()
+      assertResult(0)(defaultResult.length)
+
+      // With include_ingestion_commits=true: should find and process 
COMMIT_ACTION inflight
+      val result = spark.sql(
+        s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+          s"allowed_inflight_interval_minutes => 1, " +
+          s"include_ingestion_commits => true)").collect()
+
+      assertResult(1)(result.length)
+      assertResult(staleTs)(result(0).getString(0))
+      assertResult(HoodieTimeline.COMMIT_ACTION)(result(0).getString(1))
+      assertResult(true)(result(0).getBoolean(2))
+    }
+  }
+
+  test("Test cleanup_stale_inflight_commits dry_run lists matched instants 
without rolling back") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath
+      createEmptyTable(tableName, tablePath)
+
+      val staleTs = "20200101120000"
+      injectInflightInstant(tablePath, HoodieTimeline.REPLACE_COMMIT_ACTION, 
staleTs)
+
+      val result = spark.sql(
+        s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+          s"allowed_inflight_interval_minutes => 1, " +
+          s"dry_run => true)").collect()
+
+      assertResult(1)(result.length)
+      assertResult(staleTs)(result(0).getString(0))
+      
assertResult(HoodieTimeline.REPLACE_COMMIT_ACTION)(result(0).getString(1))
+      // dry_run: rollback_status is NULL meaning "matched but not actioned"
+      assert(result(0).isNullAt(2),
+        "Expected rollback_status=NULL in dry_run mode, but got non-null 
value")
+
+      // Verify the instant is STILL on the active timeline (dry_run did not 
act)
+      val metaClient = HoodieTableMetaClient.builder
+        
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+        .setBasePath(tablePath)
+        .build
+      val remaining = 
metaClient.reloadActiveTimeline().filterInflightsAndRequested()
+        .getInstants.asScala
+      assert(remaining.exists(_.requestedTime == staleTs),
+        s"dry_run should not have rolled back $staleTs; expected to find it 
still on the timeline")
+    }
+  }
+
+  test("Test cleanup_stale_inflight_commits rolls back stale COMPACTION_ACTION 
inflight via table.rollbackInflightCompaction") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+      // Compaction is MOR-only. Use the real schedule + run + delete-commit 
pattern from
+      // TestRunRollbackInflightTableServiceProcedure to construct a valid 
compaction inflight.
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  type = 'mor',
+           |  preCombineField = 'ts'
+           | )
+           | partitioned by(ts)
+           | location '$tablePath'
+       """.stripMargin)
+      withSQLConf(
+        "hoodie.parquet.max.file.size" -> "10000",
+        "hoodie.compact.inline" -> "false",
+        "hoodie.compact.schedule.inline" -> "false",
+        // Prevent auto-clean from creating a clean instant after compaction 
completion,
+        // which would shift getReverseOrderedInstants.findFirst() away from 
the compaction commit.
+        "hoodie.clean.automatic" -> "false"
+      ) {
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+        spark.sql(s"update $tableName set price = 11 where id = 1")
+
+        spark.sql(s"call run_compaction(op => 'schedule', table => 
'$tableName')")
+        spark.sql(s"call run_compaction(op => 'run', table => '$tableName')")
+
+        // Delete the completed compaction commit file so the inflight remains
+        val metaClient = HoodieTableMetaClient.builder
+          
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+          .setBasePath(tablePath)
+          .build
+        val compactionInstant = 
metaClient.getActiveTimeline.getReverseOrderedInstants.findFirst().get()
+        
metaClient.getActiveTimeline.deleteInstantFileIfExists(compactionInstant)
+        val compactionInstantTime = compactionInstant.requestedTime
+
+        // Confirm the compaction inflight is actually present before we call 
cleanup.
+        // If this assertion fires, the test setup (schedule+run+delete) 
didn't produce the expected
+        // state and the rest of the test is moot — fail with a clear 
diagnostic instead of an empty result.
+        val reloadedTimeline = metaClient.reloadActiveTimeline()
+        val compactionInflightPresent = 
reloadedTimeline.getWriteTimeline.filterInflightsAndRequested.getInstants.asScala
+          .exists(i => i.getAction == HoodieTimeline.COMPACTION_ACTION && 
i.requestedTime == compactionInstantTime)
+        assert(compactionInflightPresent,
+          s"Setup failure: compaction inflight at $compactionInstantTime not 
present after deleting completed commit. " +
+            s"Active timeline: ${reloadedTimeline.getInstants.asScala.map(i => 
s"${i.requestedTime}/${i.getAction}/${i.getState}").mkString(", ")}")
+
+        // Sleep so the second-precision cutoff timestamp is strictly newer 
than the inflight's timestamp
+        Thread.sleep(2000)
+
+        val result = spark.sql(
+          s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+            s"allowed_inflight_interval_minutes => 0)").collect()
+
+        val compactionRow = result.find(r => r.getString(0) == 
compactionInstantTime)
+        assert(compactionRow.isDefined,
+          s"Expected compaction inflight $compactionInstantTime in result; got 
${result.map(r => s"${r.getString(0)}/${r.getString(1)}").mkString(",")}")
+        
assertResult(HoodieTimeline.COMPACTION_ACTION)(compactionRow.get.getString(1))
+        assertResult(true)(compactionRow.get.getBoolean(2))
+
+        // Inflight should be removed by table.rollbackInflightCompaction
+        val instantGenerator = metaClient.getTimelineLayout.getInstantGenerator
+        val expectedInflight = instantGenerator.createNewInstant(
+          HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, 
compactionInstantTime)
+        
assert(!metaClient.reloadActiveTimeline().getInstants.contains(expectedInflight),
+          s"Compaction inflight $compactionInstantTime should be gone after 
rollback")
+      }
+    }
+  }
+
+  test("Test cleanup_stale_inflight_commits rolls back stale CLUSTERING_ACTION 
inflight via table.rollbackInflightClustering") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+      // Use the real schedule + execute + delete-commit pattern from
+      // TestRunRollbackInflightTableServiceProcedure so the clustering 
inflight is valid.
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  type = 'cow',
+           |  preCombineField = 'ts'
+           | )
+           | partitioned by(ts)
+           | location '$tablePath'
+       """.stripMargin)
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+
+      spark.sql(s"call run_clustering(table => '$tableName', op => 
'schedule')")
+      spark.sql(s"call run_clustering(table => '$tableName', op => 'execute')")
+
+      val metaClient = HoodieTableMetaClient.builder
+        
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+        .setBasePath(tablePath)
+        .build
+      val clusteringInstant = 
metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.get(0)
+      metaClient.getActiveTimeline.deleteInstantFileIfExists(clusteringInstant)
+      val clusteringInstantTime = clusteringInstant.requestedTime
+
+      Thread.sleep(2000)
+
+      val result = spark.sql(
+        s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+          s"allowed_inflight_interval_minutes => 0)").collect()
+
+      val clusteringRow = result.find(r => r.getString(0) == 
clusteringInstantTime)
+      assert(clusteringRow.isDefined,
+        s"Expected clustering inflight $clusteringInstantTime in result; got 
${result.map(_.getString(0)).mkString(",")}")
+      
assertResult(HoodieTimeline.CLUSTERING_ACTION)(clusteringRow.get.getString(1))
+      assertResult(true)(clusteringRow.get.getBoolean(2))
+
+      val instantGenerator = metaClient.getTimelineLayout.getInstantGenerator
+      val expectedInflight = instantGenerator.createNewInstant(
+        HoodieInstant.State.INFLIGHT, HoodieTimeline.CLUSTERING_ACTION, 
clusteringInstantTime)
+      
assert(!metaClient.reloadActiveTimeline().getInstants.contains(expectedInflight),
+        s"Clustering inflight $clusteringInstantTime should be gone after 
rollback")
+    }
+  }
+
+  test("Test cleanup_stale_inflight_commits handles partitioned COW table") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath
+      createEmptyPartitionedTable(tableName, tablePath, "cow")
+
+      val staleTs = "20200101120000"
+      injectInflightInstant(tablePath, HoodieTimeline.REPLACE_COMMIT_ACTION, 
staleTs)
+
+      val result = spark.sql(
+        s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+          s"allowed_inflight_interval_minutes => 1)").collect()
+
+      assertResult(1)(result.length)
+      assertResult(staleTs)(result(0).getString(0))
+      
assertResult(HoodieTimeline.REPLACE_COMMIT_ACTION)(result(0).getString(1))
+      assertResult(true)(result(0).getBoolean(2))
+    }
+  }
+
+  test("Test cleanup_stale_inflight_commits handles MOR table 
DELTA_COMMIT_ACTION with include_ingestion_commits") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath
+      createEmptyPartitionedTable(tableName, tablePath, "mor")
+
+      val staleTs = "20200101120000"
+      injectInflightInstant(tablePath, HoodieTimeline.DELTA_COMMIT_ACTION, 
staleTs)
+
+      // Default (include_ingestion_commits=false): DELTA_COMMIT_ACTION is 
filtered out
+      val defaultResult = spark.sql(
+        s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+          s"allowed_inflight_interval_minutes => 1)").collect()
+      assertResult(0)(defaultResult.length)
+
+      // With include_ingestion_commits=true: DELTA_COMMIT_ACTION is processed
+      val result = spark.sql(
+        s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+          s"allowed_inflight_interval_minutes => 1, " +
+          s"include_ingestion_commits => true)").collect()
+
+      assertResult(1)(result.length)
+      assertResult(staleTs)(result(0).getString(0))
+      assertResult(HoodieTimeline.DELTA_COMMIT_ACTION)(result(0).getString(1))
+      assertResult(true)(result(0).getBoolean(2))
+    }
+  }
+
+  /**
+   * Injects a REQUESTED→INFLIGHT instant into the active timeline without 
completing it.
+   * Used to simulate stale inflight operations for testing.
+   */
+  private def injectInflightInstant(tablePath: String, action: String, 
instantTime: String): Unit = {
+    val metaClient = HoodieTableMetaClient.builder
+      
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+      .setBasePath(tablePath)
+      .build
+    val timeline = metaClient.getActiveTimeline
+    val instantGenerator = metaClient.getTimelineLayout.getInstantGenerator
+    val requested = 
instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, action, 
instantTime)
+    timeline.createNewInstant(requested)
+    timeline.transitionRequestedToInflight(requested, 
HOption.empty[Array[Byte]]())
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInflightCommitsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInflightCommitsProcedure.scala
new file mode 100644
index 000000000000..ca77172bd607
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInflightCommitsProcedure.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+class TestShowInflightCommitsProcedure extends HoodieSparkProcedureTestBase {
+
+  test("Test show_inflight_commits returns empty for a fully committed table") 
{
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           | id int,
+           | name string,
+           | ts long
+           | ) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |   primaryKey = 'id',
+           |   type = 'cow',
+           |   preCombineField = 'ts',
+           |   hoodie.metadata.enable = "false"
+           | )
+           |""".stripMargin)
+      spark.sql(s"insert into $tableName values(1, 'a1', 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 2000)")
+
+      val result = spark.sql(s"call show_inflight_commits(table => 
'$tableName')").collect()
+      assertResult(0)(result.length)
+    }
+  }
+
+  test("Test show_inflight_commits returns injected inflight instant") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath
+      spark.sql(
+        s"""
+           |create table $tableName (
+           | id int,
+           | name string,
+           | ts long
+           | ) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |   primaryKey = 'id',
+           |   type = 'cow',
+           |   preCombineField = 'ts',
+           |   hoodie.metadata.enable = "false"
+           | )
+           |""".stripMargin)
+
+      val injectedTs = "20200101120000"
+      injectInflightInstant(tablePath, HoodieTimeline.COMMIT_ACTION, 
injectedTs)
+
+      val result = spark.sql(s"call show_inflight_commits(table => 
'$tableName')").collect()
+      assert(result.length >= 1)
+      val row = result.find(r => r.getString(0) == injectedTs)
+      assert(row.isDefined, s"Expected inflight instant $injectedTs not found 
in results")
+      assertResult(HoodieTimeline.COMMIT_ACTION)(row.get.getString(1))
+      assertResult("INFLIGHT")(row.get.getString(2))
+    }
+  }
+
+  test("Test show_inflight_commits min_age_minutes filter includes old and 
excludes recent") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath
+      spark.sql(
+        s"""
+           |create table $tableName (
+           | id int,
+           | name string,
+           | ts long
+           | ) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |   primaryKey = 'id',
+           |   type = 'cow',
+           |   preCombineField = 'ts',
+           |   hoodie.metadata.enable = "false"
+           | )
+           |""".stripMargin)
+
+      val oldTs   = "20200101120000"
+      val freshTs = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())
+
+      injectInflightInstant(tablePath, HoodieTimeline.COMMIT_ACTION, oldTs)
+      injectInflightInstant(tablePath, HoodieTimeline.DELTA_COMMIT_ACTION, 
freshTs)
+
+      // No filter: both should appear
+      val allResults = spark.sql(s"call show_inflight_commits(table => 
'$tableName', min_age_minutes => 0)").collect()
+      assert(allResults.length >= 2)
+      assert(allResults.exists(r => r.getString(0) == oldTs))
+      assert(allResults.exists(r => r.getString(0) == freshTs))
+
+      // 60-minute filter: only the old instant should appear
+      val filteredResults = spark.sql(
+        s"call show_inflight_commits(table => '$tableName', min_age_minutes => 
60)").collect()
+      assert(filteredResults.exists(r => r.getString(0) == oldTs),
+        s"Expected old instant $oldTs to appear with min_age_minutes=60")
+      assert(!filteredResults.exists(r => r.getString(0) == freshTs),
+        s"Fresh instant $freshTs should not appear with min_age_minutes=60")
+    }
+  }
+
+  test("Test show_inflight_commits requires table parameter") {
+    checkExceptionContain(
+      "call show_inflight_commits()")(
+      "Argument: table is required")
+  }
+
+  /**
+   * Injects a REQUESTED→INFLIGHT instant into the active timeline without 
completing it.
+   * Used to simulate stale inflight operations for testing.
+   */
+  private def injectInflightInstant(tablePath: String, action: String, 
instantTime: String): Unit = {
+    val metaClient = HoodieTableMetaClient.builder
+      
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+      .setBasePath(tablePath)
+      .build
+    val timeline = metaClient.getActiveTimeline
+    val instantGenerator = metaClient.getTimelineLayout.getInstantGenerator
+    val requested = 
instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, action, 
instantTime)
+    timeline.createNewInstant(requested)
+    timeline.transitionRequestedToInflight(requested, 
HOption.empty[Array[Byte]]())
+  }
+}

Reply via email to