This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 16c9dd6c07 Spark 4.0: Refactor Spark procedures to consistently use 
ProcedureInput for parameter handling. (#13913)
16c9dd6c07 is described below

commit 16c9dd6c07d3288ef2b3d504c6af7ba817159d48
Author: slfan1989 <[email protected]>
AuthorDate: Wed Sep 24 23:48:10 2025 +0800

    Spark 4.0: Refactor Spark procedures to consistently use ProcedureInput for 
parameter handling. (#13913)
---
 .../TestCherrypickSnapshotProcedure.java           |  2 +-
 .../extensions/TestExpireSnapshotsProcedure.java   |  2 +-
 .../extensions/TestFastForwardBranchProcedure.java |  2 +-
 .../extensions/TestPublishChangesProcedure.java    |  2 +-
 .../extensions/TestRemoveOrphanFilesProcedure.java |  2 +-
 .../extensions/TestRewriteManifestsProcedure.java  |  2 +-
 .../TestRollbackToSnapshotProcedure.java           |  2 +-
 .../TestSetCurrentSnapshotProcedure.java           |  4 +-
 .../procedures/CherrypickSnapshotProcedure.java    | 16 ++--
 .../spark/procedures/ExpireSnapshotsProcedure.java | 45 ++++++----
 .../procedures/FastForwardBranchProcedure.java     | 21 +++--
 .../iceberg/spark/procedures/ProcedureInput.java   | 33 ++++++++
 .../spark/procedures/PublishChangesProcedure.java  | 16 ++--
 .../spark/procedures/RegisterTableProcedure.java   | 16 ++--
 .../procedures/RemoveOrphanFilesProcedure.java     | 96 +++++++++++-----------
 .../procedures/RewriteManifestsProcedure.java      | 20 +++--
 .../procedures/RollbackToSnapshotProcedure.java    | 15 ++--
 .../procedures/RollbackToTimestampProcedure.java   | 16 ++--
 .../procedures/SetCurrentSnapshotProcedure.java    | 20 +++--
 19 files changed, 207 insertions(+), 125 deletions(-)

diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java
index fa9e209c1d..a236f13f12 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java
@@ -184,7 +184,7 @@ public class TestCherrypickSnapshotProcedure extends 
ExtensionsTestBase {
 
     assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('', 1L)", 
catalogName))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot handle an empty identifier for argument table");
+        .hasMessage("Cannot handle an empty identifier for parameter 'table'");
 
     assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('t', 
'2.2')", catalogName))
         .isInstanceOf(IllegalArgumentException.class)
diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index 75bbff1fc7..c116cb4f85 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -189,7 +189,7 @@ public class TestExpireSnapshotsProcedure extends 
ExtensionsTestBase {
 
     assertThatThrownBy(() -> sql("CALL %s.system.expire_snapshots('')", 
catalogName))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot handle an empty identifier for argument table");
+        .hasMessage("Cannot handle an empty identifier for parameter 'table'");
   }
 
   @TestTemplate
diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
index c8762a9436..69920e1d54 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
@@ -190,7 +190,7 @@ public class TestFastForwardBranchProcedure extends 
ExtensionsTestBase {
     assertThatThrownBy(
             () -> sql("CALL %s.system.fast_forward('', 'main', 'newBranch')", 
catalogName))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot handle an empty identifier for argument table");
+        .hasMessage("Cannot handle an empty identifier for parameter 'table'");
   }
 
   @TestTemplate
diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
index 3a2c7a6333..4958fde15d 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -182,6 +182,6 @@ public class TestPublishChangesProcedure extends 
ExtensionsTestBase {
 
     assertThatThrownBy(() -> sql("CALL %s.system.publish_changes('', 
'not_valid')", catalogName))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot handle an empty identifier for argument table");
+        .hasMessage("Cannot handle an empty identifier for parameter 'table'");
   }
 }
diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index f5869cf0a8..a5ac8a7e01 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -271,7 +271,7 @@ public class TestRemoveOrphanFilesProcedure extends 
ExtensionsTestBase {
 
     assertThatThrownBy(() -> sql("CALL %s.system.remove_orphan_files('')", 
catalogName))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot handle an empty identifier for argument table");
+        .hasMessage("Cannot handle an empty identifier for parameter 'table'");
   }
 
   @TestTemplate
diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index a6896715ca..b8dca4b2cd 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -326,7 +326,7 @@ public class TestRewriteManifestsProcedure extends 
ExtensionsTestBase {
 
     assertThatThrownBy(() -> sql("CALL %s.system.rewrite_manifests('')", 
catalogName))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot handle an empty identifier for argument table");
+        .hasMessage("Cannot handle an empty identifier for parameter 'table'");
   }
 
   @TestTemplate
diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java
index eee7bed8b0..cffc65f5dc 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java
@@ -281,6 +281,6 @@ public class TestRollbackToSnapshotProcedure extends 
ExtensionsTestBase {
 
     assertThatThrownBy(() -> sql("CALL %s.system.rollback_to_snapshot('', 
1L)", catalogName))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot handle an empty identifier for argument table");
+        .hasMessage("Cannot handle an empty identifier for parameter 'table'");
   }
 }
diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java
index 0e0ae57063..ab0eca78d5 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java
@@ -214,7 +214,7 @@ public class TestSetCurrentSnapshotProcedure extends 
ExtensionsTestBase {
 
     assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot(1L)", 
catalogName))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot parse identifier for arg table: 1");
+        .hasMessage("Cannot parse identifier for parameter 'table': 1");
 
     assertThatThrownBy(
             () -> sql("CALL %s.system.set_current_snapshot(snapshot_id => 
1L)", catalogName))
@@ -233,7 +233,7 @@ public class TestSetCurrentSnapshotProcedure extends 
ExtensionsTestBase {
 
     assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot('', 
1L)", catalogName))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot handle an empty identifier for argument table");
+        .hasMessage("Cannot handle an empty identifier for parameter 'table'");
 
     assertThatThrownBy(
             () ->
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java
index b24394f83b..31043b953b 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java
@@ -45,11 +45,13 @@ class CherrypickSnapshotProcedure extends BaseProcedure {
 
   static final String NAME = "cherrypick_snapshot";
 
+  private static final ProcedureParameter TABLE_PARAM =
+      requiredInParameter("table", DataTypes.StringType);
+  private static final ProcedureParameter SNAPSHOT_ID_PARAM =
+      requiredInParameter("snapshot_id", DataTypes.LongType);
+
   private static final ProcedureParameter[] PARAMETERS =
-      new ProcedureParameter[] {
-        requiredInParameter("table", DataTypes.StringType),
-        requiredInParameter("snapshot_id", DataTypes.LongType)
-      };
+      new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM};
 
   private static final StructType OUTPUT_TYPE =
       new StructType(
@@ -83,8 +85,10 @@ class CherrypickSnapshotProcedure extends BaseProcedure {
 
   @Override
   public Iterator<Scan> call(InternalRow args) {
-    Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-    long snapshotId = args.getLong(1);
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
+
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    long snapshotId = input.asLong(SNAPSHOT_ID_PARAM);
 
     return asScanIterator(
         OUTPUT_TYPE,
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index 7ff85c752b..2b771914c3 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -26,7 +26,6 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
-import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -51,15 +50,30 @@ public class ExpireSnapshotsProcedure extends BaseProcedure 
{
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ExpireSnapshotsProcedure.class);
 
+  private static final ProcedureParameter TABLE_PARAM =
+      requiredInParameter("table", DataTypes.StringType);
+  private static final ProcedureParameter OLDER_THAN_PARAM =
+      optionalInParameter("older_than", DataTypes.TimestampType);
+  private static final ProcedureParameter RETAIN_LAST_PARAM =
+      optionalInParameter("retain_last", DataTypes.IntegerType);
+  private static final ProcedureParameter MAX_CONCURRENT_DELETES_PARAM =
+      optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType);
+  private static final ProcedureParameter STREAM_RESULTS_PARAM =
+      optionalInParameter("stream_results", DataTypes.BooleanType);
+  private static final ProcedureParameter SNAPSHOT_IDS_PARAM =
+      optionalInParameter("snapshot_ids", 
DataTypes.createArrayType(DataTypes.LongType));
+  private static final ProcedureParameter CLEAN_EXPIRED_METADATA_PARAM =
+      optionalInParameter("clean_expired_metadata", DataTypes.BooleanType);
+
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
-        requiredInParameter("table", DataTypes.StringType),
-        optionalInParameter("older_than", DataTypes.TimestampType),
-        optionalInParameter("retain_last", DataTypes.IntegerType),
-        optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType),
-        optionalInParameter("stream_results", DataTypes.BooleanType),
-        optionalInParameter("snapshot_ids", 
DataTypes.createArrayType(DataTypes.LongType)),
-        optionalInParameter("clean_expired_metadata", DataTypes.BooleanType)
+        TABLE_PARAM,
+        OLDER_THAN_PARAM,
+        RETAIN_LAST_PARAM,
+        MAX_CONCURRENT_DELETES_PARAM,
+        STREAM_RESULTS_PARAM,
+        SNAPSHOT_IDS_PARAM,
+        CLEAN_EXPIRED_METADATA_PARAM
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -104,13 +118,14 @@ public class ExpireSnapshotsProcedure extends 
BaseProcedure {
   @Override
   @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public Iterator<Scan> call(InternalRow args) {
-    Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-    Long olderThanMillis = args.isNullAt(1) ? null : 
DateTimeUtil.microsToMillis(args.getLong(1));
-    Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
-    Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
-    Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
-    long[] snapshotIds = args.isNullAt(5) ? null : 
args.getArray(5).toLongArray();
-    Boolean cleanExpiredMetadata = args.isNullAt(6) ? null : 
args.getBoolean(6);
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    Long olderThanMillis = input.asTimestampMillis(OLDER_THAN_PARAM, null);
+    Integer retainLastNum = input.asInt(RETAIN_LAST_PARAM, null);
+    Integer maxConcurrentDeletes = input.asInt(MAX_CONCURRENT_DELETES_PARAM, 
null);
+    Boolean streamResult = input.asBoolean(STREAM_RESULTS_PARAM, null);
+    long[] snapshotIds = input.asLongArray(SNAPSHOT_IDS_PARAM, null);
+    Boolean cleanExpiredMetadata = 
input.asBoolean(CLEAN_EXPIRED_METADATA_PARAM, null);
 
     Preconditions.checkArgument(
         maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
index 367f7d2a4d..d7531e7594 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
@@ -35,12 +35,15 @@ public class FastForwardBranchProcedure extends 
BaseProcedure {
 
   static final String NAME = "fast_forward";
 
+  private static final ProcedureParameter TABLE_PARAM =
+      requiredInParameter("table", DataTypes.StringType);
+  private static final ProcedureParameter BRANCH_PARAM =
+      requiredInParameter("branch", DataTypes.StringType);
+  private static final ProcedureParameter TO_PARAM =
+      requiredInParameter("to", DataTypes.StringType);
+
   private static final ProcedureParameter[] PARAMETERS =
-      new ProcedureParameter[] {
-        requiredInParameter("table", DataTypes.StringType),
-        requiredInParameter("branch", DataTypes.StringType),
-        requiredInParameter("to", DataTypes.StringType)
-      };
+      new ProcedureParameter[] {TABLE_PARAM, BRANCH_PARAM, TO_PARAM};
 
   private static final StructType OUTPUT_TYPE =
       new StructType(
@@ -75,9 +78,11 @@ public class FastForwardBranchProcedure extends 
BaseProcedure {
 
   @Override
   public Iterator<Scan> call(InternalRow args) {
-    Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-    String from = args.getString(1);
-    String to = args.getString(2);
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
+
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    String from = input.asString(BRANCH_PARAM);
+    String to = input.asString(TO_PARAM);
 
     return modifyIcebergTable(
         tableIdent,
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java
index 14f891c10b..8f6dbdcf5a 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java
@@ -26,6 +26,7 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.util.ArrayData;
@@ -80,6 +81,22 @@ class ProcedureInput {
     return args.isNullAt(ordinal) ? defaultValue : (Integer) 
args.getInt(ordinal);
   }
 
+  public long asTimestampMillis(ProcedureParameter param) {
+    Long value = asTimestampMillis(param, null);
+    Preconditions.checkArgument(value != null, "Parameter '%s' is not set", 
param.name());
+    return value;
+  }
+
+  public Long asTimestampMillis(ProcedureParameter param, Long defaultValue) {
+    validateParamType(param, DataTypes.TimestampType);
+    int ordinal = ordinal(param);
+    Long value = args.isNullAt(ordinal) ? defaultValue : (Long) 
args.getLong(ordinal);
+    if (value != null) {
+      value = DateTimeUtil.microsToMillis(value);
+    }
+    return value;
+  }
+
   public long asLong(ProcedureParameter param) {
     Long value = asLong(param, null);
     Preconditions.checkArgument(value != null, "Parameter '%s' is not set", 
param.name());
@@ -92,6 +109,22 @@ class ProcedureInput {
     return args.isNullAt(ordinal) ? defaultValue : (Long) 
args.getLong(ordinal);
   }
 
+  public long[] asLongArray(ProcedureParameter param, Long[] defaultValue) {
+    validateParamType(param, DataTypes.createArrayType(DataTypes.LongType));
+    Long[] source =
+        array(param, (array, ordinal) -> array.getLong(ordinal), Long.class, 
defaultValue);
+
+    if (source == null) {
+      return null;
+    }
+
+    long[] result = new long[source.length];
+    for (int i = 0; i < source.length; i++) {
+      result[i] = source[i];
+    }
+    return result;
+  }
+
   public String asString(ProcedureParameter param) {
     String value = asString(param, null);
     Preconditions.checkArgument(value != null, "Parameter '%s' is not set", 
param.name());
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
index 872972e22e..8748882043 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -49,11 +49,13 @@ class PublishChangesProcedure extends BaseProcedure {
 
   static final String NAME = "publish_changes";
 
+  private static final ProcedureParameter TABLE_PARAM =
+      requiredInParameter("table", DataTypes.StringType);
+  private static final ProcedureParameter WAP_ID_PARAM =
+      requiredInParameter("wap_id", DataTypes.StringType);
+
   private static final ProcedureParameter[] PARAMETERS =
-      new ProcedureParameter[] {
-        requiredInParameter("table", DataTypes.StringType),
-        requiredInParameter("wap_id", DataTypes.StringType)
-      };
+      new ProcedureParameter[] {TABLE_PARAM, WAP_ID_PARAM};
 
   private static final StructType OUTPUT_TYPE =
       new StructType(
@@ -87,8 +89,10 @@ class PublishChangesProcedure extends BaseProcedure {
 
   @Override
   public Iterator<Scan> call(InternalRow args) {
-    Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-    String wapId = args.getString(1);
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
+
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    String wapId = input.asString(WAP_ID_PARAM);
 
     return modifyIcebergTable(
         tableIdent,
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
index d4c68b1e7d..9ba577ad7e 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
@@ -42,11 +42,13 @@ class RegisterTableProcedure extends BaseProcedure {
 
   static final String NAME = "register_table";
 
+  private static final ProcedureParameter TABLE_PARAM =
+      requiredInParameter("table", DataTypes.StringType);
+  private static final ProcedureParameter METADATA_FILE_PARAM =
+      requiredInParameter("metadata_file", DataTypes.StringType);
+
   private static final ProcedureParameter[] PARAMETERS =
-      new ProcedureParameter[] {
-        requiredInParameter("table", DataTypes.StringType),
-        requiredInParameter("metadata_file", DataTypes.StringType)
-      };
+      new ProcedureParameter[] {TABLE_PARAM, METADATA_FILE_PARAM};
 
   private static final StructType OUTPUT_TYPE =
       new StructType(
@@ -81,9 +83,11 @@ class RegisterTableProcedure extends BaseProcedure {
 
   @Override
   public Iterator<Scan> call(InternalRow args) {
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
     TableIdentifier tableName =
-        Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0), 
"table"));
-    String metadataFile = args.getString(1);
+        Spark3Util.identifierToTableIdentifier(
+            toIdentifier(input.asString(TABLE_PARAM), TABLE_PARAM.name()));
+    String metadataFile = input.asString(METADATA_FILE_PARAM);
     Preconditions.checkArgument(
         tableCatalog() instanceof HasIcebergCatalog,
         "Cannot use Register Table in a non-Iceberg catalog");
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index ac3c29156e..f30f99978c 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -26,12 +26,11 @@ import org.apache.iceberg.actions.DeleteOrphanFiles;
 import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
 import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
-import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -45,7 +44,6 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
 
 /**
  * A procedure that removes orphan files in a table.
@@ -58,19 +56,40 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RemoveOrphanFilesProcedure.class);
 
+  private static final ProcedureParameter TABLE_PARAM =
+      requiredInParameter("table", DataTypes.StringType);
+  private static final ProcedureParameter OLDER_THAN_PARAM =
+      optionalInParameter("older_than", DataTypes.TimestampType);
+  private static final ProcedureParameter LOCATION_PARAM =
+      optionalInParameter("location", DataTypes.StringType);
+  private static final ProcedureParameter DRY_RUN_PARAM =
+      optionalInParameter("dry_run", DataTypes.BooleanType);
+  private static final ProcedureParameter MAX_CONCURRENT_DELETES_PARAM =
+      optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType);
+  private static final ProcedureParameter FILE_LIST_VIEW_PARAM =
+      optionalInParameter("file_list_view", DataTypes.StringType);
+  private static final ProcedureParameter EQUAL_SCHEMES_PARAM =
+      optionalInParameter("equal_schemes", STRING_MAP);
+  private static final ProcedureParameter EQUAL_AUTHORITIES_PARAM =
+      optionalInParameter("equal_authorities", STRING_MAP);
+  private static final ProcedureParameter PREFIX_MISMATCH_MODE_PARAM =
+      optionalInParameter("prefix_mismatch_mode", DataTypes.StringType);
+  // List files with prefix operations. Default is false.
+  private static final ProcedureParameter PREFIX_LISTING_PARAM =
+      optionalInParameter("prefix_listing", DataTypes.BooleanType);
+
   private static final ProcedureParameter[] PARAMETERS =
       new ProcedureParameter[] {
-        requiredInParameter("table", DataTypes.StringType),
-        optionalInParameter("older_than", DataTypes.TimestampType),
-        optionalInParameter("location", DataTypes.StringType),
-        optionalInParameter("dry_run", DataTypes.BooleanType),
-        optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType),
-        optionalInParameter("file_list_view", DataTypes.StringType),
-        optionalInParameter("equal_schemes", STRING_MAP),
-        optionalInParameter("equal_authorities", STRING_MAP),
-        optionalInParameter("prefix_mismatch_mode", DataTypes.StringType),
-        // List files with prefix operations. Default is false.
-        optionalInParameter("prefix_listing", DataTypes.BooleanType)
+        TABLE_PARAM,
+        OLDER_THAN_PARAM,
+        LOCATION_PARAM,
+        DRY_RUN_PARAM,
+        MAX_CONCURRENT_DELETES_PARAM,
+        FILE_LIST_VIEW_PARAM,
+        EQUAL_SCHEMES_PARAM,
+        EQUAL_AUTHORITIES_PARAM,
+        PREFIX_MISMATCH_MODE_PARAM,
+        PREFIX_LISTING_PARAM
       };
 
   private static final StructType OUTPUT_TYPE =
@@ -105,46 +124,26 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
   @Override
   @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public Iterator<Scan> call(InternalRow args) {
-    Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-    Long olderThanMillis = args.isNullAt(1) ? null : 
DateTimeUtil.microsToMillis(args.getLong(1));
-    String location = args.isNullAt(2) ? null : args.getString(2);
-    boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
-    Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4);
-    String fileListView = args.isNullAt(5) ? null : args.getString(5);
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    Long olderThanMillis = input.asTimestampMillis(OLDER_THAN_PARAM, null);
+    String location = input.asString(LOCATION_PARAM, null);
+    boolean dryRun = input.asBoolean(DRY_RUN_PARAM, false);
+    Integer maxConcurrentDeletes = input.asInt(MAX_CONCURRENT_DELETES_PARAM, 
null);
+    String fileListView = input.asString(FILE_LIST_VIEW_PARAM, null);
 
     Preconditions.checkArgument(
         maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
         "max_concurrent_deletes should have value > 0, value: %s",
         maxConcurrentDeletes);
 
-    Map<String, String> equalSchemes = Maps.newHashMap();
-    if (!args.isNullAt(6)) {
-      args.getMap(6)
-          .foreach(
-              DataTypes.StringType,
-              DataTypes.StringType,
-              (k, v) -> {
-                equalSchemes.put(k.toString(), v.toString());
-                return BoxedUnit.UNIT;
-              });
-    }
-
-    Map<String, String> equalAuthorities = Maps.newHashMap();
-    if (!args.isNullAt(7)) {
-      args.getMap(7)
-          .foreach(
-              DataTypes.StringType,
-              DataTypes.StringType,
-              (k, v) -> {
-                equalAuthorities.put(k.toString(), v.toString());
-                return BoxedUnit.UNIT;
-              });
-    }
+    Map<String, String> equalSchemes = input.asStringMap(EQUAL_SCHEMES_PARAM, 
ImmutableMap.of());
+    Map<String, String> equalAuthorities =
+        input.asStringMap(EQUAL_AUTHORITIES_PARAM, ImmutableMap.of());
 
-    PrefixMismatchMode prefixMismatchMode =
-        args.isNullAt(8) ? null : 
PrefixMismatchMode.fromString(args.getString(8));
+    PrefixMismatchMode prefixMismatchMode = asPrefixMismatchMode(input, 
PREFIX_MISMATCH_MODE_PARAM);
 
-    boolean prefixListing = args.isNullAt(9) ? false : args.getBoolean(9);
+    boolean prefixListing = input.asBoolean(PREFIX_LISTING_PARAM, false);
 
     return withIcebergTable(
         tableIdent,
@@ -236,4 +235,9 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
   public String description() {
     return "RemoveOrphanFilesProcedure";
   }
+
+  private PrefixMismatchMode asPrefixMismatchMode(ProcedureInput input, 
ProcedureParameter param) {
+    String modeAsString = input.asString(param, null);
+    return (modeAsString == null) ? null : 
PrefixMismatchMode.fromString(modeAsString);
+  }
 }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
index 9b9a9d0688..d4bd0764ee 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
@@ -48,12 +48,15 @@ class RewriteManifestsProcedure extends BaseProcedure {
 
   static final String NAME = "rewrite_manifests";
 
+  private static final ProcedureParameter TABLE_PARAM =
+      requiredInParameter("table", DataTypes.StringType);
+  private static final ProcedureParameter USE_CACHING_PARAM =
+      optionalInParameter("use_caching", DataTypes.BooleanType);
+  private static final ProcedureParameter SPEC_ID_PARAM =
+      optionalInParameter("spec_id", DataTypes.IntegerType);
+
   private static final ProcedureParameter[] PARAMETERS =
-      new ProcedureParameter[] {
-        requiredInParameter("table", DataTypes.StringType),
-        optionalInParameter("use_caching", DataTypes.BooleanType),
-        optionalInParameter("spec_id", DataTypes.IntegerType)
-      };
+      new ProcedureParameter[] {TABLE_PARAM, USE_CACHING_PARAM, SPEC_ID_PARAM};
 
   // counts are not nullable since the action result is never null
   private static final StructType OUTPUT_TYPE =
@@ -89,9 +92,10 @@ class RewriteManifestsProcedure extends BaseProcedure {
 
   @Override
   public Iterator<Scan> call(InternalRow args) {
-    Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-    Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1);
-    Integer specId = args.isNullAt(2) ? null : args.getInt(2);
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    Boolean useCaching = input.asBoolean(USE_CACHING_PARAM, null);
+    Integer specId = input.asInt(SPEC_ID_PARAM, null);
 
     return modifyIcebergTable(
         tableIdent,
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
index b8a5368e77..98e1e2b870 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
@@ -44,11 +44,13 @@ class RollbackToSnapshotProcedure extends BaseProcedure {
 
   static final String NAME = "rollback_to_snapshot";
 
+  private static final ProcedureParameter TABLE_PARAM =
+      requiredInParameter("table", DataTypes.StringType);
+  private static final ProcedureParameter SNAPSHOT_ID_PARAM =
+      requiredInParameter("snapshot_id", DataTypes.LongType);
+
   private static final ProcedureParameter[] PARAMETERS =
-      new ProcedureParameter[] {
-        requiredInParameter("table", DataTypes.StringType),
-        requiredInParameter("snapshot_id", DataTypes.LongType)
-      };
+      new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM};
 
   private static final StructType OUTPUT_TYPE =
       new StructType(
@@ -82,8 +84,9 @@ class RollbackToSnapshotProcedure extends BaseProcedure {
 
   @Override
   public Iterator<Scan> call(InternalRow args) {
-    Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-    long snapshotId = args.getLong(1);
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    long snapshotId = input.asLong(SNAPSHOT_ID_PARAM);
 
     return modifyIcebergTable(
         tableIdent,
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
index 5f7b0064c2..fd6791df28 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.spark.procedures;
 import java.util.Iterator;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
-import org.apache.iceberg.util.DateTimeUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -45,11 +44,13 @@ class RollbackToTimestampProcedure extends BaseProcedure {
 
   static final String NAME = "rollback_to_timestamp";
 
+  private static final ProcedureParameter TABLE_PARAM =
+      requiredInParameter("table", DataTypes.StringType);
+  private static final ProcedureParameter TIMESTAMP_PARAM =
+      requiredInParameter("timestamp", DataTypes.TimestampType);
+
   private static final ProcedureParameter[] PARAMETERS =
-      new ProcedureParameter[] {
-        requiredInParameter("table", DataTypes.StringType),
-        requiredInParameter("timestamp", DataTypes.TimestampType)
-      };
+      new ProcedureParameter[] {TABLE_PARAM, TIMESTAMP_PARAM};
 
   private static final StructType OUTPUT_TYPE =
       new StructType(
@@ -83,9 +84,10 @@ class RollbackToTimestampProcedure extends BaseProcedure {
 
   @Override
   public Iterator<Scan> call(InternalRow args) {
-    Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
+    Identifier tableIdent = input.ident(TABLE_PARAM);
     // timestamps in Spark have microsecond precision so this conversion is 
lossy
-    long timestampMillis = DateTimeUtil.microsToMillis(args.getLong(1));
+    long timestampMillis = input.asTimestampMillis(TIMESTAMP_PARAM);
 
     return modifyIcebergTable(
         tableIdent,
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java
index edd13fc095..18e3646cf5 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java
@@ -48,12 +48,15 @@ class SetCurrentSnapshotProcedure extends BaseProcedure {
 
   static final String NAME = "set_current_snapshot";
 
+  private static final ProcedureParameter TABLE_PARAM =
+      requiredInParameter("table", DataTypes.StringType);
+  private static final ProcedureParameter SNAPSHOT_ID_PARAM =
+      optionalInParameter("snapshot_id", DataTypes.LongType);
+  private static final ProcedureParameter REF_PARAM =
+      optionalInParameter("ref", DataTypes.StringType);
+
   private static final ProcedureParameter[] PARAMETERS =
-      new ProcedureParameter[] {
-        requiredInParameter("table", DataTypes.StringType),
-        optionalInParameter("snapshot_id", DataTypes.LongType),
-        optionalInParameter("ref", DataTypes.StringType)
-      };
+      new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM, REF_PARAM};
 
   private static final StructType OUTPUT_TYPE =
       new StructType(
@@ -87,9 +90,10 @@ class SetCurrentSnapshotProcedure extends BaseProcedure {
 
   @Override
   public Iterator<Scan> call(InternalRow args) {
-    Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-    Long snapshotId = args.isNullAt(1) ? null : args.getLong(1);
-    String ref = args.isNullAt(2) ? null : args.getString(2);
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), 
PARAMETERS, args);
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    Long snapshotId = input.asLong(SNAPSHOT_ID_PARAM, null);
+    String ref = input.asString(REF_PARAM, null);
     Preconditions.checkArgument(
         (snapshotId != null && ref == null) || (snapshotId == null && ref != 
null),
         "Either snapshot_id or ref must be provided, not both");


Reply via email to