This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new a55d1235d2 Spark 4.1 | 4.0 | 3.5 | 3.4: Fail publish_changes procedure
if there's more than one matching snapshot (#14955)
a55d1235d2 is described below
commit a55d1235d20a542b7de4cc60772dad172a414861
Author: Sam Wheating <[email protected]>
AuthorDate: Thu Jan 29 15:52:23 2026 -0800
Spark 4.1 | 4.0 | 3.5 | 3.4: Fail publish_changes procedure if there's more
than one matching snapshot (#14955)
* Fail publish_changes procedure if there's multiple matching snapshots
* rewrite publish_changes procedure to early-exit on duplicated wap.id
* Update docs for publish_changes procedure
* run spotlessApply
* Update docs/docs/spark-procedures.md
* backport fix to spark 3.4, 3.5, 4.0
---
docs/docs/spark-procedures.md | 2 ++
.../extensions/TestPublishChangesProcedure.java | 20 ++++++++++++++++
.../spark/procedures/PublishChangesProcedure.java | 27 ++++++++++++----------
.../extensions/TestPublishChangesProcedure.java | 20 ++++++++++++++++
.../spark/procedures/PublishChangesProcedure.java | 27 ++++++++++++----------
.../extensions/TestPublishChangesProcedure.java | 20 ++++++++++++++++
.../spark/procedures/PublishChangesProcedure.java | 27 ++++++++++++----------
.../extensions/TestPublishChangesProcedure.java | 20 ++++++++++++++++
.../spark/procedures/PublishChangesProcedure.java | 27 ++++++++++++----------
9 files changed, 142 insertions(+), 48 deletions(-)
diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md
index addcf87d78..c040f49ef8 100644
--- a/docs/docs/spark-procedures.md
+++ b/docs/docs/spark-procedures.md
@@ -189,6 +189,8 @@ publish_changes creates a new snapshot from an existing
snapshot without alterin
Only append and dynamic overwrite snapshots can be successfully published.
+The `publish_changes` procedure will fail if there are multiple snapshots in
the table with the provided `wap_id`.
+
!!! info
This procedure invalidates all cached Spark plans that reference the
affected table.
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
index 08f44c8f01..d9319801d1 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -161,6 +161,26 @@ public class TestPublishChangesProcedure extends
ExtensionsTestBase {
.hasMessage("Cannot apply unknown WAP ID 'not_valid'");
}
+ @TestTemplate
+ public void testApplyDuplicateWapId() {
+
+ String wapId = "wap_id_1";
+
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName,
WRITE_AUDIT_PUBLISH_ENABLED);
+
+ spark.conf().set("spark.wap.id", wapId);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ assertThatThrownBy(
+ () -> sql("CALL %s.system.publish_changes('%s', '%s')",
catalogName, tableIdent, wapId))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage(
+ "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP
ID 'wap_id_1'");
+ }
+
@TestTemplate
public void testInvalidApplyWapChangesCases() {
assertThatThrownBy(
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
index 2c3ce7418e..a47e754153 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -18,10 +18,8 @@
*/
package org.apache.iceberg.spark.procedures;
-import java.util.Optional;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.WapUtil;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -92,21 +90,26 @@ class PublishChangesProcedure extends BaseProcedure {
return modifyIcebergTable(
tableIdent,
table -> {
- Optional<Snapshot> wapSnapshot =
- Optional.ofNullable(
- Iterables.find(
- table.snapshots(),
- snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
- null));
- if (!wapSnapshot.isPresent()) {
+ Snapshot matchingSnap = null;
+ for (Snapshot snap : table.snapshots()) {
+ if (wapId.equals(WapUtil.stagedWapId(snap))) {
+ if (matchingSnap != null) {
+ throw new ValidationException(
+ "Cannot apply non-unique WAP ID. Found multiple snapshots
with WAP ID '%s'",
+ wapId);
+ } else {
+ matchingSnap = snap;
+ }
+ }
+ }
+
+ if (matchingSnap == null) {
throw new ValidationException("Cannot apply unknown WAP ID '%s'",
wapId);
}
- long wapSnapshotId = wapSnapshot.get().snapshotId();
+ long wapSnapshotId = matchingSnap.snapshotId();
table.manageSnapshots().cherrypick(wapSnapshotId).commit();
-
Snapshot currentSnapshot = table.currentSnapshot();
-
InternalRow outputRow = newInternalRow(wapSnapshotId,
currentSnapshot.snapshotId());
return new InternalRow[] {outputRow};
});
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
index 08f44c8f01..d9319801d1 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -161,6 +161,26 @@ public class TestPublishChangesProcedure extends
ExtensionsTestBase {
.hasMessage("Cannot apply unknown WAP ID 'not_valid'");
}
+ @TestTemplate
+ public void testApplyDuplicateWapId() {
+
+ String wapId = "wap_id_1";
+
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName,
WRITE_AUDIT_PUBLISH_ENABLED);
+
+ spark.conf().set("spark.wap.id", wapId);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ assertThatThrownBy(
+ () -> sql("CALL %s.system.publish_changes('%s', '%s')",
catalogName, tableIdent, wapId))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage(
+ "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP
ID 'wap_id_1'");
+ }
+
@TestTemplate
public void testInvalidApplyWapChangesCases() {
assertThatThrownBy(
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
index 2c3ce7418e..a47e754153 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -18,10 +18,8 @@
*/
package org.apache.iceberg.spark.procedures;
-import java.util.Optional;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.WapUtil;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -92,21 +90,26 @@ class PublishChangesProcedure extends BaseProcedure {
return modifyIcebergTable(
tableIdent,
table -> {
- Optional<Snapshot> wapSnapshot =
- Optional.ofNullable(
- Iterables.find(
- table.snapshots(),
- snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
- null));
- if (!wapSnapshot.isPresent()) {
+ Snapshot matchingSnap = null;
+ for (Snapshot snap : table.snapshots()) {
+ if (wapId.equals(WapUtil.stagedWapId(snap))) {
+ if (matchingSnap != null) {
+ throw new ValidationException(
+ "Cannot apply non-unique WAP ID. Found multiple snapshots
with WAP ID '%s'",
+ wapId);
+ } else {
+ matchingSnap = snap;
+ }
+ }
+ }
+
+ if (matchingSnap == null) {
throw new ValidationException("Cannot apply unknown WAP ID '%s'",
wapId);
}
- long wapSnapshotId = wapSnapshot.get().snapshotId();
+ long wapSnapshotId = matchingSnap.snapshotId();
table.manageSnapshots().cherrypick(wapSnapshotId).commit();
-
Snapshot currentSnapshot = table.currentSnapshot();
-
InternalRow outputRow = newInternalRow(wapSnapshotId,
currentSnapshot.snapshotId());
return new InternalRow[] {outputRow};
});
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
index 4958fde15d..c72770e1ce 100644
---
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -159,6 +159,26 @@ public class TestPublishChangesProcedure extends
ExtensionsTestBase {
.hasMessage("Cannot apply unknown WAP ID 'not_valid'");
}
+ @TestTemplate
+ public void testApplyDuplicateWapId() {
+
+ String wapId = "wap_id_1";
+
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName,
WRITE_AUDIT_PUBLISH_ENABLED);
+
+ spark.conf().set("spark.wap.id", wapId);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ assertThatThrownBy(
+ () -> sql("CALL %s.system.publish_changes('%s', '%s')",
catalogName, tableIdent, wapId))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage(
+ "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP
ID 'wap_id_1'");
+ }
+
@TestTemplate
public void testInvalidApplyWapChangesCases() {
assertThatThrownBy(
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
index 8748882043..8cb0a2bfb7 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -19,10 +19,8 @@
package org.apache.iceberg.spark.procedures;
import java.util.Iterator;
-import java.util.Optional;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.WapUtil;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -97,21 +95,26 @@ class PublishChangesProcedure extends BaseProcedure {
return modifyIcebergTable(
tableIdent,
table -> {
- Optional<Snapshot> wapSnapshot =
- Optional.ofNullable(
- Iterables.find(
- table.snapshots(),
- snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
- null));
- if (!wapSnapshot.isPresent()) {
+ Snapshot matchingSnap = null;
+ for (Snapshot snap : table.snapshots()) {
+ if (wapId.equals(WapUtil.stagedWapId(snap))) {
+ if (matchingSnap != null) {
+ throw new ValidationException(
+ "Cannot apply non-unique WAP ID. Found multiple snapshots
with WAP ID '%s'",
+ wapId);
+ } else {
+ matchingSnap = snap;
+ }
+ }
+ }
+
+ if (matchingSnap == null) {
throw new ValidationException("Cannot apply unknown WAP ID '%s'",
wapId);
}
- long wapSnapshotId = wapSnapshot.get().snapshotId();
+ long wapSnapshotId = matchingSnap.snapshotId();
table.manageSnapshots().cherrypick(wapSnapshotId).commit();
-
Snapshot currentSnapshot = table.currentSnapshot();
-
InternalRow outputRow = newInternalRow(wapSnapshotId,
currentSnapshot.snapshotId());
return asScanIterator(OUTPUT_TYPE, outputRow);
});
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
index 4958fde15d..c72770e1ce 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPublishChangesProcedure.java
@@ -159,6 +159,26 @@ public class TestPublishChangesProcedure extends
ExtensionsTestBase {
.hasMessage("Cannot apply unknown WAP ID 'not_valid'");
}
+ @TestTemplate
+ public void testApplyDuplicateWapId() {
+
+ String wapId = "wap_id_1";
+
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName,
WRITE_AUDIT_PUBLISH_ENABLED);
+
+ spark.conf().set("spark.wap.id", wapId);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ assertThatThrownBy(
+ () -> sql("CALL %s.system.publish_changes('%s', '%s')",
catalogName, tableIdent, wapId))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage(
+ "Cannot apply non-unique WAP ID. Found multiple snapshots with WAP
ID 'wap_id_1'");
+ }
+
@TestTemplate
public void testInvalidApplyWapChangesCases() {
assertThatThrownBy(
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
index 8748882043..8cb0a2bfb7 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/procedures/PublishChangesProcedure.java
@@ -19,10 +19,8 @@
package org.apache.iceberg.spark.procedures;
import java.util.Iterator;
-import java.util.Optional;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.exceptions.ValidationException;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.WapUtil;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -97,21 +95,26 @@ class PublishChangesProcedure extends BaseProcedure {
return modifyIcebergTable(
tableIdent,
table -> {
- Optional<Snapshot> wapSnapshot =
- Optional.ofNullable(
- Iterables.find(
- table.snapshots(),
- snapshot -> wapId.equals(WapUtil.stagedWapId(snapshot)),
- null));
- if (!wapSnapshot.isPresent()) {
+ Snapshot matchingSnap = null;
+ for (Snapshot snap : table.snapshots()) {
+ if (wapId.equals(WapUtil.stagedWapId(snap))) {
+ if (matchingSnap != null) {
+ throw new ValidationException(
+ "Cannot apply non-unique WAP ID. Found multiple snapshots
with WAP ID '%s'",
+ wapId);
+ } else {
+ matchingSnap = snap;
+ }
+ }
+ }
+
+ if (matchingSnap == null) {
throw new ValidationException("Cannot apply unknown WAP ID '%s'",
wapId);
}
- long wapSnapshotId = wapSnapshot.get().snapshotId();
+ long wapSnapshotId = matchingSnap.snapshotId();
table.manageSnapshots().cherrypick(wapSnapshotId).commit();
-
Snapshot currentSnapshot = table.currentSnapshot();
-
InternalRow outputRow = newInternalRow(wapSnapshotId,
currentSnapshot.snapshotId());
return asScanIterator(OUTPUT_TYPE, outputRow);
});