This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 56c1993ea Spark: Add procedure to publish WAP changes using wap.id
(#4715)
56c1993ea is described below
commit 56c1993ea9e557e7f1a25614765cadc15a74497e
Author: Edgar Rodriguez <[email protected]>
AuthorDate: Wed Jul 6 17:25:52 2022 -0400
Spark: Add procedure to publish WAP changes using wap.id (#4715)
---
.../extensions/TestPublishChangesProcedure.java | 176 +++++++++++++++++++++
.../spark/procedures/PublishChangesProcedure.java | 108 +++++++++++++
.../iceberg/spark/procedures/SparkProcedures.java | 1 +
.../extensions/TestPublishChangesProcedure.java | 176 +++++++++++++++++++++
.../spark/procedures/PublishChangesProcedure.java | 108 +++++++++++++
.../iceberg/spark/procedures/SparkProcedures.java | 1 +
.../extensions/TestPublishChangesProcedure.java | 176 +++++++++++++++++++++
.../spark/procedures/PublishChangesProcedure.java | 108 +++++++++++++
.../iceberg/spark/procedures/SparkProcedures.java | 1 +
9 files changed, 855 insertions(+)
diff --git
a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
new file mode 100644
index 000000000..f8080818a
--- /dev/null
+++
b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED;
+
+public class TestPublishChangesProcedure extends SparkExtensionsTestBase {
+
+ public TestPublishChangesProcedure(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testApplyWapChangesUsingPositionalArgs() {
+ String wapId = "wap_id_1";
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName,
WRITE_AUDIT_PUBLISH_ENABLED);
+
+ spark.conf().set("spark.wap.id", wapId);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ assertEquals("Should not see rows from staged snapshot",
+ ImmutableList.of(),
+ sql("SELECT * FROM %s", tableName));
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots());
+
+ List<Object[]> output = sql(
+ "CALL %s.system.publish_changes('%s', '%s')",
+ catalogName, tableIdent, wapId);
+
+ table.refresh();
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+
+ assertEquals("Procedure output must match",
+ ImmutableList.of(row(wapSnapshot.snapshotId(),
currentSnapshot.snapshotId())),
+ output);
+
+ assertEquals("Apply of WAP changes must be successful",
+ ImmutableList.of(row(1L, "a")),
+ sql("SELECT * FROM %s", tableName));
+ }
+
+ @Test
+ public void testApplyWapChangesUsingNamedArgs() {
+ String wapId = "wap_id_1";
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName,
WRITE_AUDIT_PUBLISH_ENABLED);
+
+ spark.conf().set("spark.wap.id", wapId);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ assertEquals("Should not see rows from staged snapshot",
+ ImmutableList.of(),
+ sql("SELECT * FROM %s", tableName));
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots());
+
+ List<Object[]> output = sql(
+ "CALL %s.system.publish_changes(wap_id => '%s', table => '%s')",
+ catalogName, wapId, tableIdent);
+
+ table.refresh();
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+
+ assertEquals("Procedure output must match",
+ ImmutableList.of(row(wapSnapshot.snapshotId(),
currentSnapshot.snapshotId())),
+ output);
+
+ assertEquals("Apply of WAP changes must be successful",
+ ImmutableList.of(row(1L, "a")),
+ sql("SELECT * FROM %s", tableName));
+ }
+
+ @Test
+ public void testApplyWapChangesRefreshesRelationCache() {
+ String wapId = "wap_id_1";
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName,
WRITE_AUDIT_PUBLISH_ENABLED);
+
+ Dataset<Row> query = spark.sql("SELECT * FROM " + tableName + " WHERE id =
1");
+ query.createOrReplaceTempView("tmp");
+
+ spark.sql("CACHE TABLE tmp");
+
+ assertEquals("View should not produce rows", ImmutableList.of(),
sql("SELECT * FROM tmp"));
+
+ spark.conf().set("spark.wap.id", wapId);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ assertEquals("Should not see rows from staged snapshot",
+ ImmutableList.of(),
+ sql("SELECT * FROM %s", tableName));
+
+ sql("CALL %s.system.publish_changes('%s', '%s')",
+ catalogName, tableIdent, wapId);
+
+ assertEquals("Apply of WAP changes should be visible",
+ ImmutableList.of(row(1L, "a")),
+ sql("SELECT * FROM tmp"));
+
+ sql("UNCACHE TABLE tmp");
+ }
+
+ @Test
+ public void testApplyInvalidWapId() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ AssertHelpers.assertThrows("Should reject invalid wap id",
+ ValidationException.class, "Cannot apply unknown WAP ID",
+ () -> sql("CALL %s.system.publish_changes('%s', 'not_valid')",
catalogName, tableIdent));
+ }
+
+ @Test
+ public void testInvalidApplyWapChangesCases() {
+ AssertHelpers.assertThrows("Should not allow mixed args",
+ AnalysisException.class, "Named and positional arguments cannot be
mixed",
+ () -> sql("CALL %s.system.publish_changes('n', table => 't',
'not_valid')", catalogName));
+
+ AssertHelpers.assertThrows("Should not resolve procedures in arbitrary
namespaces",
+ NoSuchProcedureException.class, "not found",
+ () -> sql("CALL %s.custom.publish_changes('n', 't', 'not_valid')",
catalogName));
+
+ AssertHelpers.assertThrows("Should reject calls without all required args",
+ AnalysisException.class, "Missing required parameters",
+ () -> sql("CALL %s.system.publish_changes('t')", catalogName));
+
+ AssertHelpers.assertThrows("Should reject calls with empty table
identifier",
+ IllegalArgumentException.class, "Cannot handle an empty identifier",
+ () -> sql("CALL %s.system.publish_changes('', 'not_valid')",
catalogName));
+ }
+}
diff --git
a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
new file mode 100644
index 000000000..a7d8b344a
--- /dev/null
+++
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Optional;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.util.WapUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A procedure that applies changes in a snapshot created within a
Write-Audit-Publish workflow with a wap_id and
+ * creates a new snapshot which will be set as the current snapshot in a table.
+ * <p>
+ * <em>Note:</em> this procedure invalidates all cached Spark plans that
reference the affected table.
+ *
+ * @see org.apache.iceberg.ManageSnapshots#cherrypick(long)
+ */
+class PublishChangesProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS = new
ProcedureParameter[]{
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.required("wap_id", DataTypes.StringType)
+ };
+
+ private static final StructType OUTPUT_TYPE = new StructType(new
StructField[]{
+ new StructField("source_snapshot_id", DataTypes.LongType, false,
Metadata.empty()),
+ new StructField("current_snapshot_id", DataTypes.LongType, false,
Metadata.empty())
+ });
+
+ public static ProcedureBuilder builder() {
+ return new Builder<PublishChangesProcedure>() {
+ @Override
+ protected PublishChangesProcedure doBuild() {
+ return new PublishChangesProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private PublishChangesProcedure(TableCatalog catalog) {
+ super(catalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ String wapId = args.getString(1);
+
+ return modifyIcebergTable(tableIdent, table -> {
+ Optional<Snapshot> wapSnapshot = Optional.ofNullable(
+ Iterables.find(table.snapshots(), snapshot ->
wapId.equals(WapUtil.stagedWapId(snapshot)), null));
+ if (!wapSnapshot.isPresent()) {
+ throw new ValidationException(String.format("Cannot apply unknown WAP
ID '%s'", wapId));
+ }
+
+ long wapSnapshotId = wapSnapshot.get().snapshotId();
+ table.manageSnapshots()
+ .cherrypick(wapSnapshotId)
+ .commit();
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+
+ InternalRow outputRow = newInternalRow(wapSnapshotId,
currentSnapshot.snapshotId());
+ return new InternalRow[]{outputRow};
+ });
+ }
+
+ @Override
+ public String description() {
+ return "PublishChangesProcedure";
+ }
+}
diff --git
a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 4ce9460b9..d481c19d5 100644
---
a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -53,6 +53,7 @@ public class SparkProcedures {
mapBuilder.put("snapshot", SnapshotTableProcedure::builder);
mapBuilder.put("add_files", AddFilesProcedure::builder);
mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
+ mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
return mapBuilder.build();
}
diff --git
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
new file mode 100644
index 000000000..f8080818a
--- /dev/null
+++
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED;
+
+public class TestPublishChangesProcedure extends SparkExtensionsTestBase {
+
+ public TestPublishChangesProcedure(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testApplyWapChangesUsingPositionalArgs() {
+ String wapId = "wap_id_1";
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName,
WRITE_AUDIT_PUBLISH_ENABLED);
+
+ spark.conf().set("spark.wap.id", wapId);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ assertEquals("Should not see rows from staged snapshot",
+ ImmutableList.of(),
+ sql("SELECT * FROM %s", tableName));
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots());
+
+ List<Object[]> output = sql(
+ "CALL %s.system.publish_changes('%s', '%s')",
+ catalogName, tableIdent, wapId);
+
+ table.refresh();
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+
+ assertEquals("Procedure output must match",
+ ImmutableList.of(row(wapSnapshot.snapshotId(),
currentSnapshot.snapshotId())),
+ output);
+
+ assertEquals("Apply of WAP changes must be successful",
+ ImmutableList.of(row(1L, "a")),
+ sql("SELECT * FROM %s", tableName));
+ }
+
+ @Test
+ public void testApplyWapChangesUsingNamedArgs() {
+ String wapId = "wap_id_1";
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName,
WRITE_AUDIT_PUBLISH_ENABLED);
+
+ spark.conf().set("spark.wap.id", wapId);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ assertEquals("Should not see rows from staged snapshot",
+ ImmutableList.of(),
+ sql("SELECT * FROM %s", tableName));
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots());
+
+ List<Object[]> output = sql(
+ "CALL %s.system.publish_changes(wap_id => '%s', table => '%s')",
+ catalogName, wapId, tableIdent);
+
+ table.refresh();
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+
+ assertEquals("Procedure output must match",
+ ImmutableList.of(row(wapSnapshot.snapshotId(),
currentSnapshot.snapshotId())),
+ output);
+
+ assertEquals("Apply of WAP changes must be successful",
+ ImmutableList.of(row(1L, "a")),
+ sql("SELECT * FROM %s", tableName));
+ }
+
+ @Test
+ public void testApplyWapChangesRefreshesRelationCache() {
+ String wapId = "wap_id_1";
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName,
WRITE_AUDIT_PUBLISH_ENABLED);
+
+ Dataset<Row> query = spark.sql("SELECT * FROM " + tableName + " WHERE id =
1");
+ query.createOrReplaceTempView("tmp");
+
+ spark.sql("CACHE TABLE tmp");
+
+ assertEquals("View should not produce rows", ImmutableList.of(),
sql("SELECT * FROM tmp"));
+
+ spark.conf().set("spark.wap.id", wapId);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ assertEquals("Should not see rows from staged snapshot",
+ ImmutableList.of(),
+ sql("SELECT * FROM %s", tableName));
+
+ sql("CALL %s.system.publish_changes('%s', '%s')",
+ catalogName, tableIdent, wapId);
+
+ assertEquals("Apply of WAP changes should be visible",
+ ImmutableList.of(row(1L, "a")),
+ sql("SELECT * FROM tmp"));
+
+ sql("UNCACHE TABLE tmp");
+ }
+
+ @Test
+ public void testApplyInvalidWapId() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ AssertHelpers.assertThrows("Should reject invalid wap id",
+ ValidationException.class, "Cannot apply unknown WAP ID",
+ () -> sql("CALL %s.system.publish_changes('%s', 'not_valid')",
catalogName, tableIdent));
+ }
+
+ @Test
+ public void testInvalidApplyWapChangesCases() {
+ AssertHelpers.assertThrows("Should not allow mixed args",
+ AnalysisException.class, "Named and positional arguments cannot be
mixed",
+ () -> sql("CALL %s.system.publish_changes('n', table => 't',
'not_valid')", catalogName));
+
+ AssertHelpers.assertThrows("Should not resolve procedures in arbitrary
namespaces",
+ NoSuchProcedureException.class, "not found",
+ () -> sql("CALL %s.custom.publish_changes('n', 't', 'not_valid')",
catalogName));
+
+ AssertHelpers.assertThrows("Should reject calls without all required args",
+ AnalysisException.class, "Missing required parameters",
+ () -> sql("CALL %s.system.publish_changes('t')", catalogName));
+
+ AssertHelpers.assertThrows("Should reject calls with empty table
identifier",
+ IllegalArgumentException.class, "Cannot handle an empty identifier",
+ () -> sql("CALL %s.system.publish_changes('', 'not_valid')",
catalogName));
+ }
+}
diff --git
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
new file mode 100644
index 000000000..a7d8b344a
--- /dev/null
+++
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Optional;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.util.WapUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A procedure that applies changes in a snapshot created within a
Write-Audit-Publish workflow with a wap_id and
+ * creates a new snapshot which will be set as the current snapshot in a table.
+ * <p>
+ * <em>Note:</em> this procedure invalidates all cached Spark plans that
reference the affected table.
+ *
+ * @see org.apache.iceberg.ManageSnapshots#cherrypick(long)
+ */
+class PublishChangesProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS = new
ProcedureParameter[]{
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.required("wap_id", DataTypes.StringType)
+ };
+
+ private static final StructType OUTPUT_TYPE = new StructType(new
StructField[]{
+ new StructField("source_snapshot_id", DataTypes.LongType, false,
Metadata.empty()),
+ new StructField("current_snapshot_id", DataTypes.LongType, false,
Metadata.empty())
+ });
+
+ public static ProcedureBuilder builder() {
+ return new Builder<PublishChangesProcedure>() {
+ @Override
+ protected PublishChangesProcedure doBuild() {
+ return new PublishChangesProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private PublishChangesProcedure(TableCatalog catalog) {
+ super(catalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ String wapId = args.getString(1);
+
+ return modifyIcebergTable(tableIdent, table -> {
+ Optional<Snapshot> wapSnapshot = Optional.ofNullable(
+ Iterables.find(table.snapshots(), snapshot ->
wapId.equals(WapUtil.stagedWapId(snapshot)), null));
+ if (!wapSnapshot.isPresent()) {
+ throw new ValidationException(String.format("Cannot apply unknown WAP
ID '%s'", wapId));
+ }
+
+ long wapSnapshotId = wapSnapshot.get().snapshotId();
+ table.manageSnapshots()
+ .cherrypick(wapSnapshotId)
+ .commit();
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+
+ InternalRow outputRow = newInternalRow(wapSnapshotId,
currentSnapshot.snapshotId());
+ return new InternalRow[]{outputRow};
+ });
+ }
+
+ @Override
+ public String description() {
+ return "PublishChangesProcedure";
+ }
+}
diff --git
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 4ce9460b9..d481c19d5 100644
---
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -53,6 +53,7 @@ public class SparkProcedures {
mapBuilder.put("snapshot", SnapshotTableProcedure::builder);
mapBuilder.put("add_files", AddFilesProcedure::builder);
mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
+ mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
return mapBuilder.build();
}
diff --git
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
new file mode 100644
index 000000000..f8080818a
--- /dev/null
+++
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED;
+
+public class TestPublishChangesProcedure extends SparkExtensionsTestBase {
+
+ public TestPublishChangesProcedure(String catalogName, String
implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testApplyWapChangesUsingPositionalArgs() {
+ String wapId = "wap_id_1";
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName,
WRITE_AUDIT_PUBLISH_ENABLED);
+
+ spark.conf().set("spark.wap.id", wapId);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ assertEquals("Should not see rows from staged snapshot",
+ ImmutableList.of(),
+ sql("SELECT * FROM %s", tableName));
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots());
+
+ List<Object[]> output = sql(
+ "CALL %s.system.publish_changes('%s', '%s')",
+ catalogName, tableIdent, wapId);
+
+ table.refresh();
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+
+ assertEquals("Procedure output must match",
+ ImmutableList.of(row(wapSnapshot.snapshotId(),
currentSnapshot.snapshotId())),
+ output);
+
+ assertEquals("Apply of WAP changes must be successful",
+ ImmutableList.of(row(1L, "a")),
+ sql("SELECT * FROM %s", tableName));
+ }
+
+ @Test
+ public void testApplyWapChangesUsingNamedArgs() {
+ String wapId = "wap_id_1";
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName,
WRITE_AUDIT_PUBLISH_ENABLED);
+
+ spark.conf().set("spark.wap.id", wapId);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ assertEquals("Should not see rows from staged snapshot",
+ ImmutableList.of(),
+ sql("SELECT * FROM %s", tableName));
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots());
+
+ List<Object[]> output = sql(
+ "CALL %s.system.publish_changes(wap_id => '%s', table => '%s')",
+ catalogName, wapId, tableIdent);
+
+ table.refresh();
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+
+ assertEquals("Procedure output must match",
+ ImmutableList.of(row(wapSnapshot.snapshotId(),
currentSnapshot.snapshotId())),
+ output);
+
+ assertEquals("Apply of WAP changes must be successful",
+ ImmutableList.of(row(1L, "a")),
+ sql("SELECT * FROM %s", tableName));
+ }
+
+ @Test
+ public void testApplyWapChangesRefreshesRelationCache() {
+ String wapId = "wap_id_1";
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName,
WRITE_AUDIT_PUBLISH_ENABLED);
+
+ Dataset<Row> query = spark.sql("SELECT * FROM " + tableName + " WHERE id =
1");
+ query.createOrReplaceTempView("tmp");
+
+ spark.sql("CACHE TABLE tmp");
+
+ assertEquals("View should not produce rows", ImmutableList.of(),
sql("SELECT * FROM tmp"));
+
+ spark.conf().set("spark.wap.id", wapId);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ assertEquals("Should not see rows from staged snapshot",
+ ImmutableList.of(),
+ sql("SELECT * FROM %s", tableName));
+
+ sql("CALL %s.system.publish_changes('%s', '%s')",
+ catalogName, tableIdent, wapId);
+
+ assertEquals("Apply of WAP changes should be visible",
+ ImmutableList.of(row(1L, "a")),
+ sql("SELECT * FROM tmp"));
+
+ sql("UNCACHE TABLE tmp");
+ }
+
+ @Test
+ public void testApplyInvalidWapId() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+
+ AssertHelpers.assertThrows("Should reject invalid wap id",
+ ValidationException.class, "Cannot apply unknown WAP ID",
+ () -> sql("CALL %s.system.publish_changes('%s', 'not_valid')",
catalogName, tableIdent));
+ }
+
+ @Test
+ public void testInvalidApplyWapChangesCases() {
+ AssertHelpers.assertThrows("Should not allow mixed args",
+ AnalysisException.class, "Named and positional arguments cannot be
mixed",
+ () -> sql("CALL %s.system.publish_changes('n', table => 't',
'not_valid')", catalogName));
+
+ AssertHelpers.assertThrows("Should not resolve procedures in arbitrary
namespaces",
+ NoSuchProcedureException.class, "not found",
+ () -> sql("CALL %s.custom.publish_changes('n', 't', 'not_valid')",
catalogName));
+
+ AssertHelpers.assertThrows("Should reject calls without all required args",
+ AnalysisException.class, "Missing required parameters",
+ () -> sql("CALL %s.system.publish_changes('t')", catalogName));
+
+ AssertHelpers.assertThrows("Should reject calls with empty table
identifier",
+ IllegalArgumentException.class, "Cannot handle an empty identifier",
+ () -> sql("CALL %s.system.publish_changes('', 'not_valid')",
catalogName));
+ }
+}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
new file mode 100644
index 000000000..e86d698e1
--- /dev/null
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Optional;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.util.WapUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A procedure that applies changes in a snapshot created within a
Write-Audit-Publish workflow with a wap_id and
+ * creates a new snapshot which will be set as the current snapshot in a table.
+ * <p>
+ * <em>Note:</em> this procedure invalidates all cached Spark plans that
reference the affected table.
+ *
+ * @see org.apache.iceberg.ManageSnapshots#cherrypick(long)
+ */
+class PublishChangesProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS = new
ProcedureParameter[]{
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.required("wap_id", DataTypes.StringType)
+ };
+
+ private static final StructType OUTPUT_TYPE = new StructType(new
StructField[]{
+ new StructField("source_snapshot_id", DataTypes.LongType, false,
Metadata.empty()),
+ new StructField("current_snapshot_id", DataTypes.LongType, false,
Metadata.empty())
+ });
+
+ public static ProcedureBuilder builder() {
+ return new Builder<PublishChangesProcedure>() {
+ @Override
+ protected PublishChangesProcedure doBuild() {
+ return new PublishChangesProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private PublishChangesProcedure(TableCatalog catalog) {
+ super(catalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ String wapId = args.getString(1);
+
+ return modifyIcebergTable(tableIdent, table -> {
+ Optional<Snapshot> wapSnapshot = Optional.ofNullable(
+ Iterables.find(table.snapshots(), snapshot ->
wapId.equals(WapUtil.stagedWapId(snapshot)), null));
+ if (!wapSnapshot.isPresent()) {
+ throw new ValidationException(String.format("Cannot apply unknown WAP
ID '%s'", wapId));
+ }
+
+ long wapSnapshotId = wapSnapshot.get().snapshotId();
+ table.manageSnapshots()
+ .cherrypick(wapSnapshotId)
+ .commit();
+
+ Snapshot currentSnapshot = table.currentSnapshot();
+
+ InternalRow outputRow = newInternalRow(wapSnapshotId,
currentSnapshot.snapshotId());
+ return new InternalRow[]{outputRow};
+ });
+ }
+
+ @Override
+ public String description() {
+ return "ApplyWapChangesProcedure";
+ }
+}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 1e944fcf0..a7c036e1c 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -54,6 +54,7 @@ public class SparkProcedures {
mapBuilder.put("add_files", AddFilesProcedure::builder);
mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
mapBuilder.put("register_table", RegisterTableProcedure::builder);
+ mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
return mapBuilder.build();
}