This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d2551a6670 Add SnapshotUpdateValidator to validate snapshots on commit 
(#14509)
d2551a6670 is described below

commit d2551a6670efabec958eb97c4bc28ffbba587633
Author: Daniel Weeks <[email protected]>
AuthorDate: Thu Nov 6 17:28:27 2025 -0800

    Add SnapshotUpdateValidator to validate snapshots on commit (#14509)
    
    * Add support for snapshot ancestry validation with SnapshotUpdate
---
 .../apache/iceberg/SnapshotAncestryValidator.java  | 54 ++++++++++++++
 .../java/org/apache/iceberg/SnapshotUpdate.java    |  5 ++
 .../java/org/apache/iceberg/SnapshotProducer.java  | 34 ++++++++-
 .../org/apache/iceberg/TestSnapshotProducer.java   | 86 +++++++++++++++++++++-
 4 files changed, 177 insertions(+), 2 deletions(-)

diff --git 
a/api/src/main/java/org/apache/iceberg/SnapshotAncestryValidator.java 
b/api/src/main/java/org/apache/iceberg/SnapshotAncestryValidator.java
new file mode 100644
index 0000000000..64b579a1a3
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/SnapshotAncestryValidator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.function.Function;
+import javax.annotation.Nonnull;
+
+/**
+ * Interface to support validating snapshot ancestry during the commit process.
+ *
+ * <p>Validation will be called after the table metadata is refreshed to pick 
up any changes to the
+ * table state.
+ */
+@FunctionalInterface
+public interface SnapshotAncestryValidator extends 
Function<Iterable<Snapshot>, Boolean> {
+
+  SnapshotAncestryValidator NON_VALIDATING = baseSnapshots -> true;
+
+  /**
+   * Validate the snapshots based on the refreshed table state.
+   *
+   * @param baseSnapshots ancestry of the base table metadata snapshots
+   * @return boolean for whether the update is valid
+   */
+  @Override
+  Boolean apply(Iterable<Snapshot> baseSnapshots);
+
+  /**
+   * Validation message that will be included when throwing {@link
+   * org.apache.iceberg.exceptions.ValidationException}
+   *
+   * @return message
+   */
+  @Nonnull
+  default String errorMessage() {
+    return "error message not provided";
+  }
+}
diff --git a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java 
b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
index cc6b02dee4..73509c1538 100644
--- a/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
+++ b/api/src/main/java/org/apache/iceberg/SnapshotUpdate.java
@@ -71,4 +71,9 @@ public interface SnapshotUpdate<ThisT> extends 
PendingUpdate<Snapshot> {
             "Cannot commit to branch %s: %s does not support branch commits",
             branch, this.getClass().getName()));
   }
+
+  default ThisT validateWith(SnapshotAncestryValidator validator) {
+    throw new UnsupportedOperationException(
+        "Snapshot validation not supported by " + this.getClass().getName());
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index d11f466434..ce02637d98 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.exceptions.CleanableFailure;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.metrics.CommitMetrics;
 import org.apache.iceberg.metrics.CommitMetricsResult;
@@ -117,6 +118,8 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
   private TableMetadata base;
   private boolean stageOnly = false;
   private Consumer<String> deleteFunc = defaultDelete;
+  private SnapshotAncestryValidator snapshotAncestryValidator =
+      SnapshotAncestryValidator.NON_VALIDATING;
 
   private ExecutorService workerPool;
   private String targetBranch = SnapshotRef.MAIN_BRANCH;
@@ -159,6 +162,20 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
     return self();
   }
 
+  /**
+   * Set a validator to check snapshot ancestry before committing changes.
+   *
+   * <p>If there is no parent snapshot, an empty iterable will be supplied to 
the validator.
+   *
+   * @param validator a validator to check snapshot ancestry validity
+   * @return this for method chaining
+   */
+  @Override
+  public ThisT validateWith(SnapshotAncestryValidator validator) {
+    this.snapshotAncestryValidator = validator;
+    return self();
+  }
+
   protected TableOperations ops() {
     return ops;
   }
@@ -257,7 +274,8 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
     long sequenceNumber = base.nextSequenceNumber();
     Long parentSnapshotId = parentSnapshot == null ? null : 
parentSnapshot.snapshotId();
 
-    validate(base, parentSnapshot);
+    runValidations(parentSnapshot);
+
     List<ManifestFile> manifests = apply(base, parentSnapshot);
 
     OutputFile manifestList = manifestListPath();
@@ -327,6 +345,20 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
         writer.toManifestListFile().encryptionKeyID());
   }
 
+  private void runValidations(Snapshot parentSnapshot) {
+    validate(base, parentSnapshot);
+
+    // Validate snapshot ancestry
+    Iterable<Snapshot> snapshotAncestry =
+        parentSnapshot != null
+            ? SnapshotUtil.ancestorsOf(parentSnapshot.snapshotId(), 
base::snapshot)
+            : List.of();
+
+    boolean valid = snapshotAncestryValidator.apply(snapshotAncestry);
+    ValidationException.check(
+        valid, "Snapshot ancestry validation failed: %s", 
snapshotAncestryValidator.errorMessage());
+  }
+
   protected abstract Map<String, String> summary();
 
   /** Returns the snapshot summary from the implementation and updates totals. 
*/
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java 
b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
index c3e238e3bc..956242f66e 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
@@ -18,11 +18,22 @@
  */
 package org.apache.iceberg;
 
+import static org.apache.iceberg.SnapshotSummary.PUBLISHED_WAP_ID_PROP;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
-public class TestSnapshotProducer {
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestSnapshotProducer extends TestBase {
 
   @Test
   public void testManifestFileGroupSize() {
@@ -74,4 +85,77 @@ public class TestSnapshotProducer {
     int writerCount = SnapshotProducer.manifestWriterCount(workerPoolSize, 
fileCount);
     assertThat(writerCount).as(errMsg).isEqualTo(expectedManifestWriterCount);
   }
+
+  @TestTemplate
+  public void testCommitValidationPreventsCommit() throws IOException {
+    table.newAppend().commit();
+    String validationMessage = "Validation force failed";
+
+    // Create a CommitValidator that will reject commits
+    SnapshotAncestryValidator validator =
+        new SnapshotAncestryValidator() {
+          @Override
+          public Boolean apply(Iterable<Snapshot> baseSnapshots) {
+            return false;
+          }
+
+          @Nonnull
+          @Override
+          public String errorMessage() {
+            return validationMessage;
+          }
+        };
+
+    // Test that the validator rejects commit
+    AppendFiles append1 = 
table.newAppend().validateWith(validator).appendFile(FILE_A);
+    assertThatThrownBy(append1::commit)
+        .isInstanceOf(ValidationException.class)
+        .hasMessage("Snapshot ancestry validation failed: " + 
validationMessage);
+
+    // Verify the file was not committed
+    assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(0);
+  }
+
+  @TestTemplate
+  public void testCommitValidationWithCustomSummaryProperties() throws 
IOException {
+    String wapId = "wap-12345-staging-audit";
+
+    // Create a validator that checks custom summary properties
+    SnapshotAncestryValidator customPropertyValidator =
+        baseSnapshots -> {
+          List<String> publishedWapIds =
+              Streams.stream(baseSnapshots)
+                  .filter(snapshot -> 
snapshot.summary().containsKey(PUBLISHED_WAP_ID_PROP))
+                  .map(snapshot -> 
snapshot.summary().get(PUBLISHED_WAP_ID_PROP))
+                  .collect(Collectors.toList());
+
+          return !publishedWapIds.contains(wapId);
+        };
+
+    // Add a file with and set a published WAP id
+    table
+        .newFastAppend()
+        .validateWith(customPropertyValidator)
+        .appendFile(FILE_A)
+        .set(PUBLISHED_WAP_ID_PROP, wapId)
+        .commit();
+
+    // Verify the current state of the table
+    
assertThat(table.currentSnapshot().summary().get(PUBLISHED_WAP_ID_PROP)).isEqualTo(wapId);
+
+    // Attempt to add the same published WAP id
+    AppendFiles append2 =
+        table
+            .newFastAppend()
+            .validateWith(customPropertyValidator)
+            .appendFile(FILE_A)
+            .set(PUBLISHED_WAP_ID_PROP, wapId);
+
+    assertThatThrownBy(append2::commit)
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Snapshot ancestry validation failed");
+
+    // Verify the table wasn't updated
+    assertThat(table.snapshots()).hasSize(1);
+  }
 }

Reply via email to