This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 16c9dd6c07 Spark 4.0: Refactor Spark procedures to consistently use
ProcedureInput for parameter handling. (#13913)
16c9dd6c07 is described below
commit 16c9dd6c07d3288ef2b3d504c6af7ba817159d48
Author: slfan1989 <[email protected]>
AuthorDate: Wed Sep 24 23:48:10 2025 +0800
Spark 4.0: Refactor Spark procedures to consistently use ProcedureInput for
parameter handling. (#13913)
---
.../TestCherrypickSnapshotProcedure.java | 2 +-
.../extensions/TestExpireSnapshotsProcedure.java | 2 +-
.../extensions/TestFastForwardBranchProcedure.java | 2 +-
.../extensions/TestPublishChangesProcedure.java | 2 +-
.../extensions/TestRemoveOrphanFilesProcedure.java | 2 +-
.../extensions/TestRewriteManifestsProcedure.java | 2 +-
.../TestRollbackToSnapshotProcedure.java | 2 +-
.../TestSetCurrentSnapshotProcedure.java | 4 +-
.../procedures/CherrypickSnapshotProcedure.java | 16 ++--
.../spark/procedures/ExpireSnapshotsProcedure.java | 45 ++++++----
.../procedures/FastForwardBranchProcedure.java | 21 +++--
.../iceberg/spark/procedures/ProcedureInput.java | 33 ++++++++
.../spark/procedures/PublishChangesProcedure.java | 16 ++--
.../spark/procedures/RegisterTableProcedure.java | 16 ++--
.../procedures/RemoveOrphanFilesProcedure.java | 96 +++++++++++-----------
.../procedures/RewriteManifestsProcedure.java | 20 +++--
.../procedures/RollbackToSnapshotProcedure.java | 15 ++--
.../procedures/RollbackToTimestampProcedure.java | 16 ++--
.../procedures/SetCurrentSnapshotProcedure.java | 20 +++--
19 files changed, 207 insertions(+), 125 deletions(-)
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java
index fa9e209c1d..a236f13f12 100644
---
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCherrypickSnapshotProcedure.java
@@ -184,7 +184,7 @@ public class TestCherrypickSnapshotProcedure extends
ExtensionsTestBase {
assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('', 1L)",
catalogName))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot handle an empty identifier for argument table");
+ .hasMessage("Cannot handle an empty identifier for parameter 'table'");
assertThatThrownBy(() -> sql("CALL %s.system.cherrypick_snapshot('t',
'2.2')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index 75bbff1fc7..c116cb4f85 100644
---
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -189,7 +189,7 @@ public class TestExpireSnapshotsProcedure extends
ExtensionsTestBase {
assertThatThrownBy(() -> sql("CALL %s.system.expire_snapshots('')",
catalogName))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot handle an empty identifier for argument table");
+ .hasMessage("Cannot handle an empty identifier for parameter 'table'");
}
@TestTemplate
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
index c8762a9436..69920e1d54 100644
---
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestFastForwardBranchProcedure.java
@@ -190,7 +190,7 @@ public class TestFastForwardBranchProcedure extends
ExtensionsTestBase {
assertThatThrownBy(
() -> sql("CALL %s.system.fast_forward('', 'main', 'newBranch')",
catalogName))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot handle an empty identifier for argument table");
+ .hasMessage("Cannot handle an empty identifier for parameter 'table'");
}
@TestTemplate
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
index 3a2c7a6333..4958fde15d 100644
---
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -182,6 +182,6 @@ public class TestPublishChangesProcedure extends
ExtensionsTestBase {
assertThatThrownBy(() -> sql("CALL %s.system.publish_changes('',
'not_valid')", catalogName))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot handle an empty identifier for argument table");
+ .hasMessage("Cannot handle an empty identifier for parameter 'table'");
}
}
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index f5869cf0a8..a5ac8a7e01 100644
---
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -271,7 +271,7 @@ public class TestRemoveOrphanFilesProcedure extends
ExtensionsTestBase {
assertThatThrownBy(() -> sql("CALL %s.system.remove_orphan_files('')",
catalogName))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot handle an empty identifier for argument table");
+ .hasMessage("Cannot handle an empty identifier for parameter 'table'");
}
@TestTemplate
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index a6896715ca..b8dca4b2cd 100644
---
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -326,7 +326,7 @@ public class TestRewriteManifestsProcedure extends
ExtensionsTestBase {
assertThatThrownBy(() -> sql("CALL %s.system.rewrite_manifests('')",
catalogName))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot handle an empty identifier for argument table");
+ .hasMessage("Cannot handle an empty identifier for parameter 'table'");
}
@TestTemplate
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java
index eee7bed8b0..cffc65f5dc 100644
---
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRollbackToSnapshotProcedure.java
@@ -281,6 +281,6 @@ public class TestRollbackToSnapshotProcedure extends
ExtensionsTestBase {
assertThatThrownBy(() -> sql("CALL %s.system.rollback_to_snapshot('',
1L)", catalogName))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot handle an empty identifier for argument table");
+ .hasMessage("Cannot handle an empty identifier for parameter 'table'");
}
}
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java
index 0e0ae57063..ab0eca78d5 100644
---
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetCurrentSnapshotProcedure.java
@@ -214,7 +214,7 @@ public class TestSetCurrentSnapshotProcedure extends
ExtensionsTestBase {
assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot(1L)",
catalogName))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot parse identifier for arg table: 1");
+ .hasMessage("Cannot parse identifier for parameter 'table': 1");
assertThatThrownBy(
() -> sql("CALL %s.system.set_current_snapshot(snapshot_id =>
1L)", catalogName))
@@ -233,7 +233,7 @@ public class TestSetCurrentSnapshotProcedure extends
ExtensionsTestBase {
assertThatThrownBy(() -> sql("CALL %s.system.set_current_snapshot('',
1L)", catalogName))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Cannot handle an empty identifier for argument table");
+ .hasMessage("Cannot handle an empty identifier for parameter 'table'");
assertThatThrownBy(
() ->
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java
index b24394f83b..31043b953b 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CherrypickSnapshotProcedure.java
@@ -45,11 +45,13 @@ class CherrypickSnapshotProcedure extends BaseProcedure {
static final String NAME = "cherrypick_snapshot";
+ private static final ProcedureParameter TABLE_PARAM =
+ requiredInParameter("table", DataTypes.StringType);
+ private static final ProcedureParameter SNAPSHOT_ID_PARAM =
+ requiredInParameter("snapshot_id", DataTypes.LongType);
+
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {
- requiredInParameter("table", DataTypes.StringType),
- requiredInParameter("snapshot_id", DataTypes.LongType)
- };
+ new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM};
private static final StructType OUTPUT_TYPE =
new StructType(
@@ -83,8 +85,10 @@ class CherrypickSnapshotProcedure extends BaseProcedure {
@Override
public Iterator<Scan> call(InternalRow args) {
- Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- long snapshotId = args.getLong(1);
+ ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
+
+ Identifier tableIdent = input.ident(TABLE_PARAM);
+ long snapshotId = input.asLong(SNAPSHOT_ID_PARAM);
return asScanIterator(
OUTPUT_TYPE,
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index 7ff85c752b..2b771914c3 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -26,7 +26,6 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.actions.ExpireSnapshotsSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
-import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -51,15 +50,30 @@ public class ExpireSnapshotsProcedure extends BaseProcedure
{
private static final Logger LOG =
LoggerFactory.getLogger(ExpireSnapshotsProcedure.class);
+ private static final ProcedureParameter TABLE_PARAM =
+ requiredInParameter("table", DataTypes.StringType);
+ private static final ProcedureParameter OLDER_THAN_PARAM =
+ optionalInParameter("older_than", DataTypes.TimestampType);
+ private static final ProcedureParameter RETAIN_LAST_PARAM =
+ optionalInParameter("retain_last", DataTypes.IntegerType);
+ private static final ProcedureParameter MAX_CONCURRENT_DELETES_PARAM =
+ optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType);
+ private static final ProcedureParameter STREAM_RESULTS_PARAM =
+ optionalInParameter("stream_results", DataTypes.BooleanType);
+ private static final ProcedureParameter SNAPSHOT_IDS_PARAM =
+ optionalInParameter("snapshot_ids",
DataTypes.createArrayType(DataTypes.LongType));
+ private static final ProcedureParameter CLEAN_EXPIRED_METADATA_PARAM =
+ optionalInParameter("clean_expired_metadata", DataTypes.BooleanType);
+
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
- requiredInParameter("table", DataTypes.StringType),
- optionalInParameter("older_than", DataTypes.TimestampType),
- optionalInParameter("retain_last", DataTypes.IntegerType),
- optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType),
- optionalInParameter("stream_results", DataTypes.BooleanType),
- optionalInParameter("snapshot_ids",
DataTypes.createArrayType(DataTypes.LongType)),
- optionalInParameter("clean_expired_metadata", DataTypes.BooleanType)
+ TABLE_PARAM,
+ OLDER_THAN_PARAM,
+ RETAIN_LAST_PARAM,
+ MAX_CONCURRENT_DELETES_PARAM,
+ STREAM_RESULTS_PARAM,
+ SNAPSHOT_IDS_PARAM,
+ CLEAN_EXPIRED_METADATA_PARAM
};
private static final StructType OUTPUT_TYPE =
@@ -104,13 +118,14 @@ public class ExpireSnapshotsProcedure extends
BaseProcedure {
@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public Iterator<Scan> call(InternalRow args) {
- Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- Long olderThanMillis = args.isNullAt(1) ? null :
DateTimeUtil.microsToMillis(args.getLong(1));
- Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
- Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
- Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
- long[] snapshotIds = args.isNullAt(5) ? null :
args.getArray(5).toLongArray();
- Boolean cleanExpiredMetadata = args.isNullAt(6) ? null :
args.getBoolean(6);
+ ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
+ Identifier tableIdent = input.ident(TABLE_PARAM);
+ Long olderThanMillis = input.asTimestampMillis(OLDER_THAN_PARAM, null);
+ Integer retainLastNum = input.asInt(RETAIN_LAST_PARAM, null);
+ Integer maxConcurrentDeletes = input.asInt(MAX_CONCURRENT_DELETES_PARAM,
null);
+ Boolean streamResult = input.asBoolean(STREAM_RESULTS_PARAM, null);
+ long[] snapshotIds = input.asLongArray(SNAPSHOT_IDS_PARAM, null);
+ Boolean cleanExpiredMetadata =
input.asBoolean(CLEAN_EXPIRED_METADATA_PARAM, null);
Preconditions.checkArgument(
maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
index 367f7d2a4d..d7531e7594 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/FastForwardBranchProcedure.java
@@ -35,12 +35,15 @@ public class FastForwardBranchProcedure extends
BaseProcedure {
static final String NAME = "fast_forward";
+ private static final ProcedureParameter TABLE_PARAM =
+ requiredInParameter("table", DataTypes.StringType);
+ private static final ProcedureParameter BRANCH_PARAM =
+ requiredInParameter("branch", DataTypes.StringType);
+ private static final ProcedureParameter TO_PARAM =
+ requiredInParameter("to", DataTypes.StringType);
+
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {
- requiredInParameter("table", DataTypes.StringType),
- requiredInParameter("branch", DataTypes.StringType),
- requiredInParameter("to", DataTypes.StringType)
- };
+ new ProcedureParameter[] {TABLE_PARAM, BRANCH_PARAM, TO_PARAM};
private static final StructType OUTPUT_TYPE =
new StructType(
@@ -75,9 +78,11 @@ public class FastForwardBranchProcedure extends
BaseProcedure {
@Override
public Iterator<Scan> call(InternalRow args) {
- Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- String from = args.getString(1);
- String to = args.getString(2);
+ ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
+
+ Identifier tableIdent = input.ident(TABLE_PARAM);
+ String from = input.asString(BRANCH_PARAM);
+ String to = input.asString(TO_PARAM);
return modifyIcebergTable(
tableIdent,
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java
index 14f891c10b..8f6dbdcf5a 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java
@@ -26,6 +26,7 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
@@ -80,6 +81,22 @@ class ProcedureInput {
return args.isNullAt(ordinal) ? defaultValue : (Integer)
args.getInt(ordinal);
}
+ public long asTimestampMillis(ProcedureParameter param) {
+ Long value = asTimestampMillis(param, null);
+ Preconditions.checkArgument(value != null, "Parameter '%s' is not set",
param.name());
+ return value;
+ }
+
+ public Long asTimestampMillis(ProcedureParameter param, Long defaultValue) {
+ validateParamType(param, DataTypes.TimestampType);
+ int ordinal = ordinal(param);
+ Long value = args.isNullAt(ordinal) ? defaultValue : (Long)
args.getLong(ordinal);
+ if (value != null) {
+ value = DateTimeUtil.microsToMillis(value);
+ }
+ return value;
+ }
+
public long asLong(ProcedureParameter param) {
Long value = asLong(param, null);
Preconditions.checkArgument(value != null, "Parameter '%s' is not set",
param.name());
@@ -92,6 +109,22 @@ class ProcedureInput {
return args.isNullAt(ordinal) ? defaultValue : (Long)
args.getLong(ordinal);
}
+ public long[] asLongArray(ProcedureParameter param, Long[] defaultValue) {
+ validateParamType(param, DataTypes.createArrayType(DataTypes.LongType));
+ Long[] source =
+ array(param, (array, ordinal) -> array.getLong(ordinal), Long.class,
defaultValue);
+
+ if (source == null) {
+ return null;
+ }
+
+ long[] result = new long[source.length];
+ for (int i = 0; i < source.length; i++) {
+ result[i] = source[i];
+ }
+ return result;
+ }
+
public String asString(ProcedureParameter param) {
String value = asString(param, null);
Preconditions.checkArgument(value != null, "Parameter '%s' is not set",
param.name());
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
index 872972e22e..8748882043 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -49,11 +49,13 @@ class PublishChangesProcedure extends BaseProcedure {
static final String NAME = "publish_changes";
+ private static final ProcedureParameter TABLE_PARAM =
+ requiredInParameter("table", DataTypes.StringType);
+ private static final ProcedureParameter WAP_ID_PARAM =
+ requiredInParameter("wap_id", DataTypes.StringType);
+
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {
- requiredInParameter("table", DataTypes.StringType),
- requiredInParameter("wap_id", DataTypes.StringType)
- };
+ new ProcedureParameter[] {TABLE_PARAM, WAP_ID_PARAM};
private static final StructType OUTPUT_TYPE =
new StructType(
@@ -87,8 +89,10 @@ class PublishChangesProcedure extends BaseProcedure {
@Override
public Iterator<Scan> call(InternalRow args) {
- Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- String wapId = args.getString(1);
+ ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
+
+ Identifier tableIdent = input.ident(TABLE_PARAM);
+ String wapId = input.asString(WAP_ID_PARAM);
return modifyIcebergTable(
tableIdent,
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
index d4c68b1e7d..9ba577ad7e 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RegisterTableProcedure.java
@@ -42,11 +42,13 @@ class RegisterTableProcedure extends BaseProcedure {
static final String NAME = "register_table";
+ private static final ProcedureParameter TABLE_PARAM =
+ requiredInParameter("table", DataTypes.StringType);
+ private static final ProcedureParameter METADATA_FILE_PARAM =
+ requiredInParameter("metadata_file", DataTypes.StringType);
+
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {
- requiredInParameter("table", DataTypes.StringType),
- requiredInParameter("metadata_file", DataTypes.StringType)
- };
+ new ProcedureParameter[] {TABLE_PARAM, METADATA_FILE_PARAM};
private static final StructType OUTPUT_TYPE =
new StructType(
@@ -81,9 +83,11 @@ class RegisterTableProcedure extends BaseProcedure {
@Override
public Iterator<Scan> call(InternalRow args) {
+ ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
TableIdentifier tableName =
- Spark3Util.identifierToTableIdentifier(toIdentifier(args.getString(0),
"table"));
- String metadataFile = args.getString(1);
+ Spark3Util.identifierToTableIdentifier(
+ toIdentifier(input.asString(TABLE_PARAM), TABLE_PARAM.name()));
+ String metadataFile = input.asString(METADATA_FILE_PARAM);
Preconditions.checkArgument(
tableCatalog() instanceof HasIcebergCatalog,
"Cannot use Register Table in a non-Iceberg catalog");
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index ac3c29156e..f30f99978c 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -26,12 +26,11 @@ import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.actions.DeleteOrphanFilesSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
-import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -45,7 +44,6 @@ import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
/**
* A procedure that removes orphan files in a table.
@@ -58,19 +56,40 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
private static final Logger LOG =
LoggerFactory.getLogger(RemoveOrphanFilesProcedure.class);
+ private static final ProcedureParameter TABLE_PARAM =
+ requiredInParameter("table", DataTypes.StringType);
+ private static final ProcedureParameter OLDER_THAN_PARAM =
+ optionalInParameter("older_than", DataTypes.TimestampType);
+ private static final ProcedureParameter LOCATION_PARAM =
+ optionalInParameter("location", DataTypes.StringType);
+ private static final ProcedureParameter DRY_RUN_PARAM =
+ optionalInParameter("dry_run", DataTypes.BooleanType);
+ private static final ProcedureParameter MAX_CONCURRENT_DELETES_PARAM =
+ optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType);
+ private static final ProcedureParameter FILE_LIST_VIEW_PARAM =
+ optionalInParameter("file_list_view", DataTypes.StringType);
+ private static final ProcedureParameter EQUAL_SCHEMES_PARAM =
+ optionalInParameter("equal_schemes", STRING_MAP);
+ private static final ProcedureParameter EQUAL_AUTHORITIES_PARAM =
+ optionalInParameter("equal_authorities", STRING_MAP);
+ private static final ProcedureParameter PREFIX_MISMATCH_MODE_PARAM =
+ optionalInParameter("prefix_mismatch_mode", DataTypes.StringType);
+ // List files with prefix operations. Default is false.
+ private static final ProcedureParameter PREFIX_LISTING_PARAM =
+ optionalInParameter("prefix_listing", DataTypes.BooleanType);
+
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
- requiredInParameter("table", DataTypes.StringType),
- optionalInParameter("older_than", DataTypes.TimestampType),
- optionalInParameter("location", DataTypes.StringType),
- optionalInParameter("dry_run", DataTypes.BooleanType),
- optionalInParameter("max_concurrent_deletes", DataTypes.IntegerType),
- optionalInParameter("file_list_view", DataTypes.StringType),
- optionalInParameter("equal_schemes", STRING_MAP),
- optionalInParameter("equal_authorities", STRING_MAP),
- optionalInParameter("prefix_mismatch_mode", DataTypes.StringType),
- // List files with prefix operations. Default is false.
- optionalInParameter("prefix_listing", DataTypes.BooleanType)
+ TABLE_PARAM,
+ OLDER_THAN_PARAM,
+ LOCATION_PARAM,
+ DRY_RUN_PARAM,
+ MAX_CONCURRENT_DELETES_PARAM,
+ FILE_LIST_VIEW_PARAM,
+ EQUAL_SCHEMES_PARAM,
+ EQUAL_AUTHORITIES_PARAM,
+ PREFIX_MISMATCH_MODE_PARAM,
+ PREFIX_LISTING_PARAM
};
private static final StructType OUTPUT_TYPE =
@@ -105,46 +124,26 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public Iterator<Scan> call(InternalRow args) {
- Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- Long olderThanMillis = args.isNullAt(1) ? null :
DateTimeUtil.microsToMillis(args.getLong(1));
- String location = args.isNullAt(2) ? null : args.getString(2);
- boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
- Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4);
- String fileListView = args.isNullAt(5) ? null : args.getString(5);
+ ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
+ Identifier tableIdent = input.ident(TABLE_PARAM);
+ Long olderThanMillis = input.asTimestampMillis(OLDER_THAN_PARAM, null);
+ String location = input.asString(LOCATION_PARAM, null);
+ boolean dryRun = input.asBoolean(DRY_RUN_PARAM, false);
+ Integer maxConcurrentDeletes = input.asInt(MAX_CONCURRENT_DELETES_PARAM,
null);
+ String fileListView = input.asString(FILE_LIST_VIEW_PARAM, null);
Preconditions.checkArgument(
maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
"max_concurrent_deletes should have value > 0, value: %s",
maxConcurrentDeletes);
- Map<String, String> equalSchemes = Maps.newHashMap();
- if (!args.isNullAt(6)) {
- args.getMap(6)
- .foreach(
- DataTypes.StringType,
- DataTypes.StringType,
- (k, v) -> {
- equalSchemes.put(k.toString(), v.toString());
- return BoxedUnit.UNIT;
- });
- }
-
- Map<String, String> equalAuthorities = Maps.newHashMap();
- if (!args.isNullAt(7)) {
- args.getMap(7)
- .foreach(
- DataTypes.StringType,
- DataTypes.StringType,
- (k, v) -> {
- equalAuthorities.put(k.toString(), v.toString());
- return BoxedUnit.UNIT;
- });
- }
+ Map<String, String> equalSchemes = input.asStringMap(EQUAL_SCHEMES_PARAM,
ImmutableMap.of());
+ Map<String, String> equalAuthorities =
+ input.asStringMap(EQUAL_AUTHORITIES_PARAM, ImmutableMap.of());
- PrefixMismatchMode prefixMismatchMode =
- args.isNullAt(8) ? null :
PrefixMismatchMode.fromString(args.getString(8));
+ PrefixMismatchMode prefixMismatchMode = asPrefixMismatchMode(input,
PREFIX_MISMATCH_MODE_PARAM);
- boolean prefixListing = args.isNullAt(9) ? false : args.getBoolean(9);
+ boolean prefixListing = input.asBoolean(PREFIX_LISTING_PARAM, false);
return withIcebergTable(
tableIdent,
@@ -236,4 +235,9 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
public String description() {
return "RemoveOrphanFilesProcedure";
}
+
+ private PrefixMismatchMode asPrefixMismatchMode(ProcedureInput input,
ProcedureParameter param) {
+ String modeAsString = input.asString(param, null);
+ return (modeAsString == null) ? null :
PrefixMismatchMode.fromString(modeAsString);
+ }
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
index 9b9a9d0688..d4bd0764ee 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
@@ -48,12 +48,15 @@ class RewriteManifestsProcedure extends BaseProcedure {
static final String NAME = "rewrite_manifests";
+ private static final ProcedureParameter TABLE_PARAM =
+ requiredInParameter("table", DataTypes.StringType);
+ private static final ProcedureParameter USE_CACHING_PARAM =
+ optionalInParameter("use_caching", DataTypes.BooleanType);
+ private static final ProcedureParameter SPEC_ID_PARAM =
+ optionalInParameter("spec_id", DataTypes.IntegerType);
+
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {
- requiredInParameter("table", DataTypes.StringType),
- optionalInParameter("use_caching", DataTypes.BooleanType),
- optionalInParameter("spec_id", DataTypes.IntegerType)
- };
+ new ProcedureParameter[] {TABLE_PARAM, USE_CACHING_PARAM, SPEC_ID_PARAM};
// counts are not nullable since the action result is never null
private static final StructType OUTPUT_TYPE =
@@ -89,9 +92,10 @@ class RewriteManifestsProcedure extends BaseProcedure {
@Override
public Iterator<Scan> call(InternalRow args) {
- Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1);
- Integer specId = args.isNullAt(2) ? null : args.getInt(2);
+ ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
+ Identifier tableIdent = input.ident(TABLE_PARAM);
+ Boolean useCaching = input.asBoolean(USE_CACHING_PARAM, null);
+ Integer specId = input.asInt(SPEC_ID_PARAM, null);
return modifyIcebergTable(
tableIdent,
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
index b8a5368e77..98e1e2b870 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
@@ -44,11 +44,13 @@ class RollbackToSnapshotProcedure extends BaseProcedure {
static final String NAME = "rollback_to_snapshot";
+ private static final ProcedureParameter TABLE_PARAM =
+ requiredInParameter("table", DataTypes.StringType);
+ private static final ProcedureParameter SNAPSHOT_ID_PARAM =
+ requiredInParameter("snapshot_id", DataTypes.LongType);
+
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {
- requiredInParameter("table", DataTypes.StringType),
- requiredInParameter("snapshot_id", DataTypes.LongType)
- };
+ new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM};
private static final StructType OUTPUT_TYPE =
new StructType(
@@ -82,8 +84,9 @@ class RollbackToSnapshotProcedure extends BaseProcedure {
@Override
public Iterator<Scan> call(InternalRow args) {
- Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- long snapshotId = args.getLong(1);
+ ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
+ Identifier tableIdent = input.ident(TABLE_PARAM);
+ long snapshotId = input.asLong(SNAPSHOT_ID_PARAM);
return modifyIcebergTable(
tableIdent,
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
index 5f7b0064c2..fd6791df28 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/RollbackToTimestampProcedure.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.spark.procedures;
import java.util.Iterator;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
-import org.apache.iceberg.util.DateTimeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
@@ -45,11 +44,13 @@ class RollbackToTimestampProcedure extends BaseProcedure {
static final String NAME = "rollback_to_timestamp";
+ private static final ProcedureParameter TABLE_PARAM =
+ requiredInParameter("table", DataTypes.StringType);
+ private static final ProcedureParameter TIMESTAMP_PARAM =
+ requiredInParameter("timestamp", DataTypes.TimestampType);
+
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {
- requiredInParameter("table", DataTypes.StringType),
- requiredInParameter("timestamp", DataTypes.TimestampType)
- };
+ new ProcedureParameter[] {TABLE_PARAM, TIMESTAMP_PARAM};
private static final StructType OUTPUT_TYPE =
new StructType(
@@ -83,9 +84,10 @@ class RollbackToTimestampProcedure extends BaseProcedure {
@Override
public Iterator<Scan> call(InternalRow args) {
- Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
+ Identifier tableIdent = input.ident(TABLE_PARAM);
// timestamps in Spark have microsecond precision so this conversion is
lossy
- long timestampMillis = DateTimeUtil.microsToMillis(args.getLong(1));
+ long timestampMillis = input.asTimestampMillis(TIMESTAMP_PARAM);
return modifyIcebergTable(
tableIdent,
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java
index edd13fc095..18e3646cf5 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SetCurrentSnapshotProcedure.java
@@ -48,12 +48,15 @@ class SetCurrentSnapshotProcedure extends BaseProcedure {
static final String NAME = "set_current_snapshot";
+ private static final ProcedureParameter TABLE_PARAM =
+ requiredInParameter("table", DataTypes.StringType);
+ private static final ProcedureParameter SNAPSHOT_ID_PARAM =
+ optionalInParameter("snapshot_id", DataTypes.LongType);
+ private static final ProcedureParameter REF_PARAM =
+ optionalInParameter("ref", DataTypes.StringType);
+
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {
- requiredInParameter("table", DataTypes.StringType),
- optionalInParameter("snapshot_id", DataTypes.LongType),
- optionalInParameter("ref", DataTypes.StringType)
- };
+ new ProcedureParameter[] {TABLE_PARAM, SNAPSHOT_ID_PARAM, REF_PARAM};
private static final StructType OUTPUT_TYPE =
new StructType(
@@ -87,9 +90,10 @@ class SetCurrentSnapshotProcedure extends BaseProcedure {
@Override
public Iterator<Scan> call(InternalRow args) {
- Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- Long snapshotId = args.isNullAt(1) ? null : args.getLong(1);
- String ref = args.isNullAt(2) ? null : args.getString(2);
+ ProcedureInput input = new ProcedureInput(spark(), tableCatalog(),
PARAMETERS, args);
+ Identifier tableIdent = input.ident(TABLE_PARAM);
+ Long snapshotId = input.asLong(SNAPSHOT_ID_PARAM, null);
+ String ref = input.asString(REF_PARAM, null);
Preconditions.checkArgument(
(snapshotId != null && ref == null) || (snapshotId == null && ref !=
null),
"Either snapshot_id or ref must be provided, not both");