This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 2731e9e9 fix(table): add AssertRefSnapshotID requirements to 
ExpireSnapshots (#672)
2731e9e9 is described below

commit 2731e9e9946a6ed9098ef7700c220c783cc2b220
Author: Krutika Dhananjay <[email protected]>
AuthorDate: Wed Jan 14 03:05:23 2026 +0530

    fix(table): add AssertRefSnapshotID requirements to ExpireSnapshots (#672)
    
    ExpireSnapshots now asserts that all ref snapshot IDs haven't changed
    concurrently during the operation. This prevents a race condition where
    a ref could be accidentally removed if it is updated to point to a
    different snapshot chain by a client while ExpireSnapshots has
    identified it as eligible for deletion.
    
    ---------
    
    Co-authored-by: Krutika Dhananjay <[email protected]>
---
 table/table_test.go  | 189 +++++++++++++++++++++++++++++++++++++++++++++++++++
 table/transaction.go |   9 ++-
 2 files changed, 197 insertions(+), 1 deletion(-)

diff --git a/table/table_test.go b/table/table_test.go
index 437c461f..72997fef 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -970,6 +970,195 @@ func (t *TableWritingTestSuite) TestExpireSnapshots() {
        t.Require().Equal(2, len(slices.Collect(tbl.Metadata().SnapshotLogs())))
 }
 
+// validatingCatalog validates requirements before applying updates,
+// simulating real catalog behavior for concurrent modification tests.
+type validatingCatalog struct {
+       metadata table.Metadata
+}
+
+func (m *validatingCatalog) LoadTable(ctx context.Context, ident 
table.Identifier) (*table.Table, error) {
+       return nil, nil
+}
+
+func (m *validatingCatalog) CommitTable(ctx context.Context, ident 
table.Identifier, reqs []table.Requirement, updates []table.Update) 
(table.Metadata, string, error) {
+       // Validate requirements against current metadata (simulates catalog 
behavior)
+       for _, req := range reqs {
+               if err := req.Validate(m.metadata); err != nil {
+                       return nil, "", err
+               }
+       }
+
+       meta, err := table.UpdateTableMetadata(m.metadata, updates, "")
+       if err != nil {
+               return nil, "", err
+       }
+
+       m.metadata = meta
+
+       return meta, "", nil
+}
+
+// TestExpireSnapshotsRejectsOnRefRollback verifies that ExpireSnapshots fails
+// when a ref is rolled back to an ancestor snapshot concurrently.
+//
+// Scenario:
+//   - main -> snapshot 5 (newest), chain: 5 <- 4 <- 3 <- 2 <- 1
+//   - ExpireSnapshots calculates: keep {5, 4, 3}, delete {2, 1}
+//   - Concurrently, client rolls main -> snapshot 2
+//   - Without assertion: would delete snapshots 1, leaving main with only 1 
accessible snapshot
+//   - With assertion: commit fails because main's snapshot ID changed
+func (t *TableWritingTestSuite) TestExpireSnapshotsRejectsOnRefRollback() {
+       fs := iceio.LocalFS{}
+
+       files := make([]string, 0)
+       for i := range 5 {
+               filePath := 
fmt.Sprintf("%s/expire_reject_rollback_v%d/data-%d.parquet", t.location, 
t.formatVersion, i)
+               t.writeParquet(fs, filePath, t.arrTablePromotedTypes)
+               files = append(files, filePath)
+       }
+
+       ident := table.Identifier{"default", "expire_reject_rollback_v" + 
strconv.Itoa(t.formatVersion)}
+       meta, err := table.NewMetadata(t.tableSchemaPromotedTypes, 
iceberg.UnpartitionedSpec,
+               table.UnsortedSortOrder, t.location, 
iceberg.Properties{table.PropertyFormatVersion: strconv.Itoa(t.formatVersion)})
+       t.Require().NoError(err)
+
+       ctx := context.Background()
+       cat := &validatingCatalog{meta}
+
+       tbl := table.New(
+               ident,
+               meta,
+               t.getMetadataLoc(),
+               func(ctx context.Context) (iceio.IO, error) {
+                       return fs, nil
+               },
+               cat,
+       )
+
+       // Create 5 snapshots
+       for i := range 5 {
+               tx := tbl.NewTransaction()
+               t.Require().NoError(tx.AddFiles(ctx, files[i:i+1], nil, false))
+               tbl, err = tx.Commit(ctx)
+               t.Require().NoError(err)
+       }
+       t.Require().Equal(5, len(tbl.Metadata().Snapshots()))
+
+       // Get snapshot IDs for later use
+       snapshots := tbl.Metadata().Snapshots()
+       snapshot2 := snapshots[1] // Second snapshot (index 1)
+
+       // Start ExpireSnapshots transaction (will calculate based on current 
main -> snapshot 5)
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.ExpireSnapshots(table.WithOlderThan(0), 
table.WithRetainLast(3)))
+
+       // Simulate concurrent rollback: update catalog's metadata to point 
main -> snapshot 2
+       // This simulates another client rolling back main before 
ExpireSnapshots commits
+       rollbackUpdates := []table.Update{
+               table.NewSetSnapshotRefUpdate("main", snapshot2.SnapshotID, 
table.BranchRef, -1, -1, -1),
+       }
+       cat.metadata, _, err = cat.CommitTable(ctx, ident, nil, rollbackUpdates)
+       t.Require().NoError(err)
+
+       // Attempt to commit ExpireSnapshots - should fail due to 
AssertRefSnapshotID
+       _, err = tx.Commit(ctx)
+       t.Require().Error(err)
+       t.Require().Contains(err.Error(), "requirement failed")
+       t.Require().Contains(err.Error(), "main")
+}
+
+// TestExpireSnapshotsRejectsOnRefUpdate verifies that ExpireSnapshots fails
+// when a ref eligible for deletion is concurrently updated to a newer 
snapshot.
+//
+// Scenario:
+//   - tag1 -> old snapshot, eligible for deletion (maxRefAgeMs exceeded)
+//   - ExpireSnapshots decides to remove tag1
+//   - Concurrently, client updates tag1 -> newer snapshot (no longer eligible)
+//   - Without assertion: tag1 would be deleted despite being updated
+//   - With assertion: commit fails because tag1's snapshot ID changed
+func (t *TableWritingTestSuite) TestExpireSnapshotsRejectsOnRefUpdate() {
+       fs := iceio.LocalFS{}
+
+       files := make([]string, 0)
+       for i := range 3 {
+               filePath := 
fmt.Sprintf("%s/expire_reject_update_v%d/data-%d.parquet", t.location, 
t.formatVersion, i)
+               t.writeParquet(fs, filePath, t.arrTablePromotedTypes)
+               files = append(files, filePath)
+       }
+
+       ident := table.Identifier{"default", "expire_reject_update_v" + 
strconv.Itoa(t.formatVersion)}
+       meta, err := table.NewMetadata(t.tableSchemaPromotedTypes, 
iceberg.UnpartitionedSpec,
+               table.UnsortedSortOrder, t.location, 
iceberg.Properties{table.PropertyFormatVersion: strconv.Itoa(t.formatVersion)})
+       t.Require().NoError(err)
+
+       ctx := context.Background()
+       cat := &validatingCatalog{meta}
+
+       tbl := table.New(
+               ident,
+               meta,
+               t.getMetadataLoc(),
+               func(ctx context.Context) (iceio.IO, error) {
+                       return fs, nil
+               },
+               cat,
+       )
+
+       // Create 3 snapshots
+       for i := range 3 {
+               tx := tbl.NewTransaction()
+               t.Require().NoError(tx.AddFiles(ctx, files[i:i+1], nil, false))
+               tbl, err = tx.Commit(ctx)
+               t.Require().NoError(err)
+       }
+       t.Require().Equal(3, len(tbl.Metadata().Snapshots()))
+
+       snapshots := tbl.Metadata().Snapshots()
+       oldSnapshot := snapshots[0]   // Oldest snapshot
+       newerSnapshot := snapshots[2] // Newest snapshot
+
+       // Create a tag pointing to the old snapshot with a short maxRefAgeMs
+       // This tag will be eligible for deletion
+       maxRefAgeMs := int64(1) // 1ms - will definitely be exceeded
+       tagUpdates := []table.Update{
+               table.NewSetSnapshotRefUpdate("expiring-tag", 
oldSnapshot.SnapshotID, table.TagRef, maxRefAgeMs, -1, -1),
+       }
+       cat.metadata, _, err = cat.CommitTable(ctx, ident, nil, tagUpdates)
+       t.Require().NoError(err)
+
+       // Reload table with updated metadata
+       tbl = table.New(
+               ident,
+               cat.metadata,
+               t.getMetadataLoc(),
+               func(ctx context.Context) (iceio.IO, error) {
+                       return fs, nil
+               },
+               cat,
+       )
+
+       // Wait a bit to ensure the tag's ref age exceeds maxRefAgeMs
+       time.Sleep(10 * time.Millisecond)
+
+       // Start ExpireSnapshots transaction (will identify expiring-tag as 
eligible for deletion)
+       tx := tbl.NewTransaction()
+       t.Require().NoError(tx.ExpireSnapshots(table.WithOlderThan(time.Hour), 
table.WithRetainLast(1)))
+
+       // Simulate concurrent update: another client updates the tag to point 
to a newer snapshot
+       // This makes the tag no longer eligible for deletion
+       updateTagUpdates := []table.Update{
+               table.NewSetSnapshotRefUpdate("expiring-tag", 
newerSnapshot.SnapshotID, table.TagRef, maxRefAgeMs, -1, -1),
+       }
+       cat.metadata, _, err = cat.CommitTable(ctx, ident, nil, 
updateTagUpdates)
+       t.Require().NoError(err)
+
+       // Attempt to commit ExpireSnapshots - should fail due to 
AssertRefSnapshotID
+       _, err = tx.Commit(ctx)
+       t.Require().Error(err)
+       t.Require().Contains(err.Error(), "requirement failed")
+       t.Require().Contains(err.Error(), "expiring-tag")
+}
+
 func (t *TableWritingTestSuite) TestWriteSpecialCharacterColumn() {
        ident := table.Identifier{"default", "write_special_character_column"}
        colNameWithSpecialChar := "letter/abc"
diff --git a/table/transaction.go b/table/transaction.go
index d949df3a..b2eccbb6 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -188,6 +188,7 @@ func (t *Transaction) ExpireSnapshots(opts 
...ExpireSnapshotsOpt) error {
        var (
                cfg         expireSnapshotsCfg
                updates     []Update
+               reqs        []Requirement
                snapsToKeep = make(map[int64]struct{})
                nowMs       = time.Now().UnixMilli()
        )
@@ -197,6 +198,12 @@ func (t *Transaction) ExpireSnapshots(opts 
...ExpireSnapshotsOpt) error {
        }
 
        for refName, ref := range t.meta.refs {
+               // Assert that this ref's snapshot ID hasn't changed 
concurrently.
+               // This ensures we don't accidentally expire snapshots that are 
now
+               // referenced by updated refs.
+               snapshotID := ref.SnapshotID
+               reqs = append(reqs, AssertRefSnapshotID(refName, &snapshotID))
+
                if refName == MainBranch {
                        snapsToKeep[ref.SnapshotID] = struct{}{}
                }
@@ -270,7 +277,7 @@ func (t *Transaction) ExpireSnapshots(opts 
...ExpireSnapshotsOpt) error {
 
        updates = append(updates, NewRemoveSnapshotsUpdate(snapsToDelete))
 
-       return t.apply(updates, nil)
+       return t.apply(updates, reqs)
 }
 
 func (t *Transaction) AppendTable(ctx context.Context, tbl arrow.Table, 
batchSize int64, snapshotProps iceberg.Properties) error {

Reply via email to