This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 56c1993ea Spark: Add procedure to publish WAP changes using wap.id 
(#4715)
56c1993ea is described below

commit 56c1993ea9e557e7f1a25614765cadc15a74497e
Author: Edgar Rodriguez <[email protected]>
AuthorDate: Wed Jul 6 17:25:52 2022 -0400

    Spark: Add procedure to publish WAP changes using wap.id (#4715)
---
 .../extensions/TestPublishChangesProcedure.java    | 176 +++++++++++++++++++++
 .../spark/procedures/PublishChangesProcedure.java  | 108 +++++++++++++
 .../iceberg/spark/procedures/SparkProcedures.java  |   1 +
 .../extensions/TestPublishChangesProcedure.java    | 176 +++++++++++++++++++++
 .../spark/procedures/PublishChangesProcedure.java  | 108 +++++++++++++
 .../iceberg/spark/procedures/SparkProcedures.java  |   1 +
 .../extensions/TestPublishChangesProcedure.java    | 176 +++++++++++++++++++++
 .../spark/procedures/PublishChangesProcedure.java  | 108 +++++++++++++
 .../iceberg/spark/procedures/SparkProcedures.java  |   1 +
 9 files changed, 855 insertions(+)

diff --git 
a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
 
b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
new file mode 100644
index 000000000..f8080818a
--- /dev/null
+++ 
b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED;
+
+public class TestPublishChangesProcedure extends SparkExtensionsTestBase {
+
+  public TestPublishChangesProcedure(String catalogName, String 
implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testApplyWapChangesUsingPositionalArgs() {
+    String wapId = "wap_id_1";
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, 
WRITE_AUDIT_PUBLISH_ENABLED);
+
+    spark.conf().set("spark.wap.id", wapId);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should not see rows from staged snapshot",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s", tableName));
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots());
+
+    List<Object[]> output = sql(
+        "CALL %s.system.publish_changes('%s', '%s')",
+        catalogName, tableIdent, wapId);
+
+    table.refresh();
+
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    assertEquals("Procedure output must match",
+        ImmutableList.of(row(wapSnapshot.snapshotId(), 
currentSnapshot.snapshotId())),
+        output);
+
+    assertEquals("Apply of WAP changes must be successful",
+        ImmutableList.of(row(1L, "a")),
+        sql("SELECT * FROM %s", tableName));
+  }
+
+  @Test
+  public void testApplyWapChangesUsingNamedArgs() {
+    String wapId = "wap_id_1";
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, 
WRITE_AUDIT_PUBLISH_ENABLED);
+
+    spark.conf().set("spark.wap.id", wapId);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should not see rows from staged snapshot",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s", tableName));
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots());
+
+    List<Object[]> output = sql(
+        "CALL %s.system.publish_changes(wap_id => '%s', table => '%s')",
+        catalogName, wapId, tableIdent);
+
+    table.refresh();
+
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    assertEquals("Procedure output must match",
+        ImmutableList.of(row(wapSnapshot.snapshotId(), 
currentSnapshot.snapshotId())),
+        output);
+
+    assertEquals("Apply of WAP changes must be successful",
+        ImmutableList.of(row(1L, "a")),
+        sql("SELECT * FROM %s", tableName));
+  }
+
+  @Test
+  public void testApplyWapChangesRefreshesRelationCache() {
+    String wapId = "wap_id_1";
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, 
WRITE_AUDIT_PUBLISH_ENABLED);
+
+    Dataset<Row> query = spark.sql("SELECT * FROM " + tableName + " WHERE id = 
1");
+    query.createOrReplaceTempView("tmp");
+
+    spark.sql("CACHE TABLE tmp");
+
+    assertEquals("View should not produce rows", ImmutableList.of(), 
sql("SELECT * FROM tmp"));
+
+    spark.conf().set("spark.wap.id", wapId);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should not see rows from staged snapshot",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s", tableName));
+
+    sql("CALL %s.system.publish_changes('%s', '%s')",
+        catalogName, tableIdent, wapId);
+
+    assertEquals("Apply of WAP changes should be visible",
+        ImmutableList.of(row(1L, "a")),
+        sql("SELECT * FROM tmp"));
+
+    sql("UNCACHE TABLE tmp");
+  }
+
+  @Test
+  public void testApplyInvalidWapId() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+
+    AssertHelpers.assertThrows("Should reject invalid wap id",
+        ValidationException.class, "Cannot apply unknown WAP ID",
+        () -> sql("CALL %s.system.publish_changes('%s', 'not_valid')", 
catalogName, tableIdent));
+  }
+
+  @Test
+  public void testInvalidApplyWapChangesCases() {
+    AssertHelpers.assertThrows("Should not allow mixed args",
+        AnalysisException.class, "Named and positional arguments cannot be 
mixed",
+        () -> sql("CALL %s.system.publish_changes('n', table => 't', 
'not_valid')", catalogName));
+
+    AssertHelpers.assertThrows("Should not resolve procedures in arbitrary 
namespaces",
+        NoSuchProcedureException.class, "not found",
+        () -> sql("CALL %s.custom.publish_changes('n', 't', 'not_valid')", 
catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls without all required args",
+        AnalysisException.class, "Missing required parameters",
+        () -> sql("CALL %s.system.publish_changes('t')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with empty table 
identifier",
+        IllegalArgumentException.class, "Cannot handle an empty identifier",
+        () -> sql("CALL %s.system.publish_changes('', 'not_valid')", 
catalogName));
+  }
+}
diff --git 
a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
 
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
new file mode 100644
index 000000000..a7d8b344a
--- /dev/null
+++ 
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Optional;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.util.WapUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A procedure that applies changes in a snapshot created within a 
Write-Audit-Publish workflow with a wap_id  and
+ * creates a new snapshot which will be set as the current snapshot in a table.
+ * <p>
+ * <em>Note:</em> this procedure invalidates all cached Spark plans that 
reference the affected table.
+ *
+ * @see org.apache.iceberg.ManageSnapshots#cherrypick(long)
+ */
+class PublishChangesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS = new 
ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.required("wap_id", DataTypes.StringType)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new 
StructField[]{
+      new StructField("source_snapshot_id", DataTypes.LongType, false, 
Metadata.empty()),
+      new StructField("current_snapshot_id", DataTypes.LongType, false, 
Metadata.empty())
+  });
+
+  public static ProcedureBuilder builder() {
+    return new Builder<PublishChangesProcedure>() {
+      @Override
+      protected PublishChangesProcedure doBuild() {
+        return new PublishChangesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private PublishChangesProcedure(TableCatalog catalog) {
+    super(catalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+    String wapId = args.getString(1);
+
+    return modifyIcebergTable(tableIdent, table -> {
+      Optional<Snapshot> wapSnapshot = Optional.ofNullable(
+          Iterables.find(table.snapshots(), snapshot -> 
wapId.equals(WapUtil.stagedWapId(snapshot)), null));
+      if (!wapSnapshot.isPresent()) {
+        throw new ValidationException(String.format("Cannot apply unknown WAP 
ID '%s'", wapId));
+      }
+
+      long wapSnapshotId = wapSnapshot.get().snapshotId();
+      table.manageSnapshots()
+          .cherrypick(wapSnapshotId)
+          .commit();
+
+      Snapshot currentSnapshot = table.currentSnapshot();
+
+      InternalRow outputRow = newInternalRow(wapSnapshotId, 
currentSnapshot.snapshotId());
+      return new InternalRow[]{outputRow};
+    });
+  }
+
+  @Override
+  public String description() {
+    return "PublishChangesProcedure";
+  }
+}
diff --git 
a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
 
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 4ce9460b9..d481c19d5 100644
--- 
a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++ 
b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -53,6 +53,7 @@ public class SparkProcedures {
     mapBuilder.put("snapshot", SnapshotTableProcedure::builder);
     mapBuilder.put("add_files", AddFilesProcedure::builder);
     mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
+    mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
     return mapBuilder.build();
   }
 
diff --git 
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
new file mode 100644
index 000000000..f8080818a
--- /dev/null
+++ 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED;
+
+public class TestPublishChangesProcedure extends SparkExtensionsTestBase {
+
+  public TestPublishChangesProcedure(String catalogName, String 
implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testApplyWapChangesUsingPositionalArgs() {
+    String wapId = "wap_id_1";
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, 
WRITE_AUDIT_PUBLISH_ENABLED);
+
+    spark.conf().set("spark.wap.id", wapId);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should not see rows from staged snapshot",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s", tableName));
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots());
+
+    List<Object[]> output = sql(
+        "CALL %s.system.publish_changes('%s', '%s')",
+        catalogName, tableIdent, wapId);
+
+    table.refresh();
+
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    assertEquals("Procedure output must match",
+        ImmutableList.of(row(wapSnapshot.snapshotId(), 
currentSnapshot.snapshotId())),
+        output);
+
+    assertEquals("Apply of WAP changes must be successful",
+        ImmutableList.of(row(1L, "a")),
+        sql("SELECT * FROM %s", tableName));
+  }
+
+  @Test
+  public void testApplyWapChangesUsingNamedArgs() {
+    String wapId = "wap_id_1";
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, 
WRITE_AUDIT_PUBLISH_ENABLED);
+
+    spark.conf().set("spark.wap.id", wapId);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should not see rows from staged snapshot",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s", tableName));
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots());
+
+    List<Object[]> output = sql(
+        "CALL %s.system.publish_changes(wap_id => '%s', table => '%s')",
+        catalogName, wapId, tableIdent);
+
+    table.refresh();
+
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    assertEquals("Procedure output must match",
+        ImmutableList.of(row(wapSnapshot.snapshotId(), 
currentSnapshot.snapshotId())),
+        output);
+
+    assertEquals("Apply of WAP changes must be successful",
+        ImmutableList.of(row(1L, "a")),
+        sql("SELECT * FROM %s", tableName));
+  }
+
+  @Test
+  public void testApplyWapChangesRefreshesRelationCache() {
+    String wapId = "wap_id_1";
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, 
WRITE_AUDIT_PUBLISH_ENABLED);
+
+    Dataset<Row> query = spark.sql("SELECT * FROM " + tableName + " WHERE id = 
1");
+    query.createOrReplaceTempView("tmp");
+
+    spark.sql("CACHE TABLE tmp");
+
+    assertEquals("View should not produce rows", ImmutableList.of(), 
sql("SELECT * FROM tmp"));
+
+    spark.conf().set("spark.wap.id", wapId);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should not see rows from staged snapshot",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s", tableName));
+
+    sql("CALL %s.system.publish_changes('%s', '%s')",
+        catalogName, tableIdent, wapId);
+
+    assertEquals("Apply of WAP changes should be visible",
+        ImmutableList.of(row(1L, "a")),
+        sql("SELECT * FROM tmp"));
+
+    sql("UNCACHE TABLE tmp");
+  }
+
+  @Test
+  public void testApplyInvalidWapId() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+
+    AssertHelpers.assertThrows("Should reject invalid wap id",
+        ValidationException.class, "Cannot apply unknown WAP ID",
+        () -> sql("CALL %s.system.publish_changes('%s', 'not_valid')", 
catalogName, tableIdent));
+  }
+
+  @Test
+  public void testInvalidApplyWapChangesCases() {
+    AssertHelpers.assertThrows("Should not allow mixed args",
+        AnalysisException.class, "Named and positional arguments cannot be 
mixed",
+        () -> sql("CALL %s.system.publish_changes('n', table => 't', 
'not_valid')", catalogName));
+
+    AssertHelpers.assertThrows("Should not resolve procedures in arbitrary 
namespaces",
+        NoSuchProcedureException.class, "not found",
+        () -> sql("CALL %s.custom.publish_changes('n', 't', 'not_valid')", 
catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls without all required args",
+        AnalysisException.class, "Missing required parameters",
+        () -> sql("CALL %s.system.publish_changes('t')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with empty table 
identifier",
+        IllegalArgumentException.class, "Cannot handle an empty identifier",
+        () -> sql("CALL %s.system.publish_changes('', 'not_valid')", 
catalogName));
+  }
+}
diff --git 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
new file mode 100644
index 000000000..a7d8b344a
--- /dev/null
+++ 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Optional;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.util.WapUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A procedure that applies changes in a snapshot created within a 
Write-Audit-Publish workflow with a wap_id  and
+ * creates a new snapshot which will be set as the current snapshot in a table.
+ * <p>
+ * <em>Note:</em> this procedure invalidates all cached Spark plans that 
reference the affected table.
+ *
+ * @see org.apache.iceberg.ManageSnapshots#cherrypick(long)
+ */
+class PublishChangesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS = new 
ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.required("wap_id", DataTypes.StringType)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new 
StructField[]{
+      new StructField("source_snapshot_id", DataTypes.LongType, false, 
Metadata.empty()),
+      new StructField("current_snapshot_id", DataTypes.LongType, false, 
Metadata.empty())
+  });
+
+  public static ProcedureBuilder builder() {
+    return new Builder<PublishChangesProcedure>() {
+      @Override
+      protected PublishChangesProcedure doBuild() {
+        return new PublishChangesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private PublishChangesProcedure(TableCatalog catalog) {
+    super(catalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+    String wapId = args.getString(1);
+
+    return modifyIcebergTable(tableIdent, table -> {
+      Optional<Snapshot> wapSnapshot = Optional.ofNullable(
+          Iterables.find(table.snapshots(), snapshot -> 
wapId.equals(WapUtil.stagedWapId(snapshot)), null));
+      if (!wapSnapshot.isPresent()) {
+        throw new ValidationException(String.format("Cannot apply unknown WAP 
ID '%s'", wapId));
+      }
+
+      long wapSnapshotId = wapSnapshot.get().snapshotId();
+      table.manageSnapshots()
+          .cherrypick(wapSnapshotId)
+          .commit();
+
+      Snapshot currentSnapshot = table.currentSnapshot();
+
+      InternalRow outputRow = newInternalRow(wapSnapshotId, 
currentSnapshot.snapshotId());
+      return new InternalRow[]{outputRow};
+    });
+  }
+
+  @Override
+  public String description() {
+    return "PublishChangesProcedure";
+  }
+}
diff --git 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 4ce9460b9..d481c19d5 100644
--- 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++ 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -53,6 +53,7 @@ public class SparkProcedures {
     mapBuilder.put("snapshot", SnapshotTableProcedure::builder);
     mapBuilder.put("add_files", AddFilesProcedure::builder);
     mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
+    mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
     return mapBuilder.build();
   }
 
diff --git 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
new file mode 100644
index 000000000..f8080818a
--- /dev/null
+++ 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.apache.iceberg.TableProperties.WRITE_AUDIT_PUBLISH_ENABLED;
+
+public class TestPublishChangesProcedure extends SparkExtensionsTestBase {
+
+  public TestPublishChangesProcedure(String catalogName, String 
implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testApplyWapChangesUsingPositionalArgs() {
+    String wapId = "wap_id_1";
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, 
WRITE_AUDIT_PUBLISH_ENABLED);
+
+    spark.conf().set("spark.wap.id", wapId);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should not see rows from staged snapshot",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s", tableName));
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots());
+
+    List<Object[]> output = sql(
+        "CALL %s.system.publish_changes('%s', '%s')",
+        catalogName, tableIdent, wapId);
+
+    table.refresh();
+
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    assertEquals("Procedure output must match",
+        ImmutableList.of(row(wapSnapshot.snapshotId(), 
currentSnapshot.snapshotId())),
+        output);
+
+    assertEquals("Apply of WAP changes must be successful",
+        ImmutableList.of(row(1L, "a")),
+        sql("SELECT * FROM %s", tableName));
+  }
+
+  @Test
+  public void testApplyWapChangesUsingNamedArgs() {
+    String wapId = "wap_id_1";
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, 
WRITE_AUDIT_PUBLISH_ENABLED);
+
+    spark.conf().set("spark.wap.id", wapId);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should not see rows from staged snapshot",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s", tableName));
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot wapSnapshot = Iterables.getOnlyElement(table.snapshots());
+
+    List<Object[]> output = sql(
+        "CALL %s.system.publish_changes(wap_id => '%s', table => '%s')",
+        catalogName, wapId, tableIdent);
+
+    table.refresh();
+
+    Snapshot currentSnapshot = table.currentSnapshot();
+
+    assertEquals("Procedure output must match",
+        ImmutableList.of(row(wapSnapshot.snapshotId(), 
currentSnapshot.snapshotId())),
+        output);
+
+    assertEquals("Apply of WAP changes must be successful",
+        ImmutableList.of(row(1L, "a")),
+        sql("SELECT * FROM %s", tableName));
+  }
+
+  @Test
+  public void testApplyWapChangesRefreshesRelationCache() {
+    String wapId = "wap_id_1";
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, 
WRITE_AUDIT_PUBLISH_ENABLED);
+
+    Dataset<Row> query = spark.sql("SELECT * FROM " + tableName + " WHERE id = 
1");
+    query.createOrReplaceTempView("tmp");
+
+    spark.sql("CACHE TABLE tmp");
+
+    assertEquals("View should not produce rows", ImmutableList.of(), 
sql("SELECT * FROM tmp"));
+
+    spark.conf().set("spark.wap.id", wapId);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    assertEquals("Should not see rows from staged snapshot",
+        ImmutableList.of(),
+        sql("SELECT * FROM %s", tableName));
+
+    sql("CALL %s.system.publish_changes('%s', '%s')",
+        catalogName, tableIdent, wapId);
+
+    assertEquals("Apply of WAP changes should be visible",
+        ImmutableList.of(row(1L, "a")),
+        sql("SELECT * FROM tmp"));
+
+    sql("UNCACHE TABLE tmp");
+  }
+
+  @Test
+  public void testApplyInvalidWapId() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+
+    AssertHelpers.assertThrows("Should reject invalid wap id",
+        ValidationException.class, "Cannot apply unknown WAP ID",
+        () -> sql("CALL %s.system.publish_changes('%s', 'not_valid')", 
catalogName, tableIdent));
+  }
+
+  @Test
+  public void testInvalidApplyWapChangesCases() {
+    AssertHelpers.assertThrows("Should not allow mixed args",
+        AnalysisException.class, "Named and positional arguments cannot be 
mixed",
+        () -> sql("CALL %s.system.publish_changes('n', table => 't', 
'not_valid')", catalogName));
+
+    AssertHelpers.assertThrows("Should not resolve procedures in arbitrary 
namespaces",
+        NoSuchProcedureException.class, "not found",
+        () -> sql("CALL %s.custom.publish_changes('n', 't', 'not_valid')", 
catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls without all required args",
+        AnalysisException.class, "Missing required parameters",
+        () -> sql("CALL %s.system.publish_changes('t')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with empty table 
identifier",
+        IllegalArgumentException.class, "Cannot handle an empty identifier",
+        () -> sql("CALL %s.system.publish_changes('', 'not_valid')", 
catalogName));
+  }
+}
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
new file mode 100644
index 000000000..e86d698e1
--- /dev/null
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Optional;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
+import org.apache.iceberg.util.WapUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A procedure that applies changes in a snapshot created within a 
Write-Audit-Publish workflow with a wap_id  and
+ * creates a new snapshot which will be set as the current snapshot in a table.
+ * <p>
+ * <em>Note:</em> this procedure invalidates all cached Spark plans that 
reference the affected table.
+ *
+ * @see org.apache.iceberg.ManageSnapshots#cherrypick(long)
+ */
+class PublishChangesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS = new 
ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.required("wap_id", DataTypes.StringType)
+  };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new 
StructField[]{
+      new StructField("source_snapshot_id", DataTypes.LongType, false, 
Metadata.empty()),
+      new StructField("current_snapshot_id", DataTypes.LongType, false, 
Metadata.empty())
+  });
+
+  public static ProcedureBuilder builder() {
+    return new Builder<PublishChangesProcedure>() {
+      @Override
+      protected PublishChangesProcedure doBuild() {
+        return new PublishChangesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private PublishChangesProcedure(TableCatalog catalog) {
+    super(catalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+    String wapId = args.getString(1);
+
+    return modifyIcebergTable(tableIdent, table -> {
+      Optional<Snapshot> wapSnapshot = Optional.ofNullable(
+          Iterables.find(table.snapshots(), snapshot -> 
wapId.equals(WapUtil.stagedWapId(snapshot)), null));
+      if (!wapSnapshot.isPresent()) {
+        throw new ValidationException(String.format("Cannot apply unknown WAP 
ID '%s'", wapId));
+      }
+
+      long wapSnapshotId = wapSnapshot.get().snapshotId();
+      table.manageSnapshots()
+          .cherrypick(wapSnapshotId)
+          .commit();
+
+      Snapshot currentSnapshot = table.currentSnapshot();
+
+      InternalRow outputRow = newInternalRow(wapSnapshotId, 
currentSnapshot.snapshotId());
+      return new InternalRow[]{outputRow};
+    });
+  }
+
+  @Override
+  public String description() {
+    return "ApplyWapChangesProcedure";
+  }
+}
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 1e944fcf0..a7c036e1c 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -54,6 +54,7 @@ public class SparkProcedures {
     mapBuilder.put("add_files", AddFilesProcedure::builder);
     mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
     mapBuilder.put("register_table", RegisterTableProcedure::builder);
+    mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
     return mapBuilder.build();
   }
 

Reply via email to