This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch 1.10.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/1.10.x by this push:
     new 1852d2d75e Core: Load snapshot after it has been committed to prevent 
accidental cleanup of files (#15511) (#15650)
1852d2d75e is described below

commit 1852d2d75e7221ecdd3dfedf44465a56143d4747
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Mon Mar 16 15:21:47 2026 +0100

    Core: Load snapshot after it has been committed to prevent accidental 
cleanup of files (#15511) (#15650)
---
 .../java/org/apache/iceberg/SnapshotProducer.java  | 41 +++++++------
 .../org/apache/iceberg/TestSnapshotProducer.java   | 69 ++++++++++++++++++++++
 .../test/java/org/apache/iceberg/TestTables.java   |  2 +-
 3 files changed, 94 insertions(+), 18 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java 
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 77cdac8f4a..6a43d45937 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -44,7 +44,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -422,8 +422,8 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
   @Override
   @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public void commit() {
-    // this is always set to the latest commit attempt's snapshot
-    AtomicReference<Snapshot> stagedSnapshot = new AtomicReference<>();
+    // this is always set to the latest commit attempt's snapshot id.
+    AtomicLong newSnapshotId = new AtomicLong(-1L);
     try (Timed ignore = commitMetrics().totalDuration().start()) {
       try {
         Tasks.foreach(ops)
@@ -438,7 +438,7 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
             .run(
                 taskOps -> {
                   Snapshot newSnapshot = apply();
-                  stagedSnapshot.set(newSnapshot);
+                  newSnapshotId.set(newSnapshot.snapshotId());
                   TableMetadata.Builder update = TableMetadata.buildFrom(base);
                   if (base.snapshot(newSnapshot.snapshotId()) != null) {
                     // this is a rollback operation
@@ -476,22 +476,29 @@ abstract class SnapshotProducer<ThisT> implements 
SnapshotUpdate<ThisT> {
         throw e;
       }
 
-      // at this point, the commit must have succeeded so the stagedSnapshot 
is committed
-      Snapshot committedSnapshot = stagedSnapshot.get();
       try {
-        LOG.info(
-            "Committed snapshot {} ({})",
-            committedSnapshot.snapshotId(),
-            getClass().getSimpleName());
+        LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), 
getClass().getSimpleName());
+
+        // at this point, the commit must have succeeded. after a refresh, the 
snapshot is loaded by
+        // id in case another commit was added between this commit and the 
refresh.
+        // it might not be known which commit attempt succeeded in some cases, 
so this only cleans
+        // up the one that actually did succeed.
+        Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
+        if (saved != null) {
+          if (cleanupAfterCommit()) {
+            cleanUncommitted(Sets.newHashSet(saved.allManifests(ops.io())));
+          }
 
-        if (cleanupAfterCommit()) {
-          
cleanUncommitted(Sets.newHashSet(committedSnapshot.allManifests(ops.io())));
-        }
-        // also clean up unused manifest lists created by multiple attempts
-        for (String manifestList : manifestLists) {
-          if (!committedSnapshot.manifestListLocation().equals(manifestList)) {
-            deleteFile(manifestList);
+          // also clean up unused manifest lists created by multiple attempts
+          for (String manifestList : manifestLists) {
+            if (!saved.manifestListLocation().equals(manifestList)) {
+              deleteFile(manifestList);
+            }
           }
+        } else {
+          // saved may not be present if the latest metadata couldn't be 
loaded due to eventual
+          // consistency problems in refresh. in that case, don't clean up.
+          LOG.warn("Failed to load committed snapshot, skipping manifest 
clean-up");
         }
       } catch (Throwable e) {
         LOG.warn(
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java 
b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
index c3e238e3bc..6a64c3c48b 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
@@ -18,9 +18,15 @@
  */
 package org.apache.iceberg;
 
+import static org.apache.iceberg.TestBase.FILE_A;
+import static org.apache.iceberg.TestBase.SCHEMA;
+import static org.apache.iceberg.TestBase.SPEC;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.io.File;
+import java.nio.file.Paths;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 public class TestSnapshotProducer {
 
@@ -74,4 +80,67 @@ public class TestSnapshotProducer {
     int writerCount = SnapshotProducer.manifestWriterCount(workerPoolSize, 
fileCount);
     assertThat(writerCount).as(errMsg).isEqualTo(expectedManifestWriterCount);
   }
+
+  @Test
+  public void manifestNotCleanedUpWhenSnapshotNotLoadableAfterCommit(@TempDir 
File tableDir) {
+    // Uses a custom TableOps that returns stale metadata (without the new 
snapshot) on the
+    // first refresh() after commit, simulating eventual consistency. Verifies 
that commit succeeds
+    // and that the committed data is visible once the table is refreshed again
+    String tableName = "stale-table-on-first-refresh";
+    TestTables.TestTableOperations ops = 
opsWithStaleRefreshAfterCommit(tableName, tableDir);
+    TestTables.TestTable tableWithStaleRefresh =
+        TestTables.create(tableDir, tableName, SCHEMA, SPEC, 
SortOrder.unsorted(), 2, ops);
+
+    // the first refresh() after the commit will return stale metadata 
(without this snapshot), so
+    // SnapshotProducer will skip cleanup to avoid accidentally deleting files 
that are part of the
+    // committed snapshot but commit still succeeds
+    tableWithStaleRefresh.newAppend().appendFile(FILE_A).commit();
+
+    // Refresh again to get the real metadata; the snapshot must be visible now
+    tableWithStaleRefresh.ops().refresh();
+    Snapshot snapshot = tableWithStaleRefresh.currentSnapshot();
+    assertThat(snapshot)
+        .as("Committed snapshot must be visible after refresh (eventual 
consistency resolved)")
+        .isNotNull();
+
+    File metadata = Paths.get(tableDir.getPath(), "metadata").toFile();
+    assertThat(snapshot.allManifests(tableWithStaleRefresh.io()))
+        .isNotEmpty()
+        .allSatisfy(
+            manifest -> assertThat(metadata.listFiles()).contains(new 
File(manifest.path())));
+  }
+
+  /**
+   * Creates a TableOperations that returns stale metadata (without the newly 
committed snapshot) on
+   * the first refresh() after a commit. This simulates eventual consistency 
where the committed
+   * snapshot is not yet visible. Used to verify that when the snapshot cannot 
be loaded after
+   * commit, cleanup is skipped to avoid accidentally deleting files that are 
part of the committed
+   * snapshot.
+   */
+  private static TestTables.TestTableOperations opsWithStaleRefreshAfterCommit(
+      String name, File location) {
+    return new TestTables.TestTableOperations(name, location) {
+      private TableMetadata metadataToReturnOnNextRefresh;
+
+      @Override
+      public void commit(TableMetadata base, TableMetadata updatedMetadata) {
+        super.commit(base, updatedMetadata);
+        if (base != null) {
+          // return stale metadata on the first refresh() call
+          this.metadataToReturnOnNextRefresh = base;
+        }
+      }
+
+      @Override
+      public TableMetadata refresh() {
+        if (metadataToReturnOnNextRefresh != null) {
+          this.current = metadataToReturnOnNextRefresh;
+          this.metadataToReturnOnNextRefresh = null;
+          return current;
+        }
+
+        return super.refresh();
+      }
+    };
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java 
b/core/src/test/java/org/apache/iceberg/TestTables.java
index 55232689ad..13c859f860 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -276,7 +276,7 @@ public class TestTables {
     private final String tableName;
     private final File metadata;
     private final FileIO fileIO;
-    private TableMetadata current = null;
+    protected TableMetadata current = null;
     private long lastSnapshotId = 0;
     private int failCommits = 0;
 

Reply via email to